2013-06-01 29 views
8

Tôi xin lỗi trước về độ dài của ghi chú này. Tôi đã dành thời gian đáng kể làm cho nó ngắn hơn, và điều này là nhỏ như tôi có thể nhận được nó.rxjava và bí mật không đồng bộ clojure: tương lai hứa hẹn và đại lý, oh của tôi

Tôi có bí ẩn và sẽ biết ơn sự giúp đỡ của bạn. Bí ẩn này xuất phát từ hành vi của một rxjava observer Tôi đã viết trong Clojure qua một vài nếp gấp đơn giản của observable s từ các mẫu trực tuyến.

Một quan sát có thể đồng bộ gửi tin nhắn tới các trình xử lý của các quan sát viên của mình và người quan sát được cho là nguyên tắc của tôi hoạt động như mong đợi.

Các quan sát không đồng bộ khác thực hiện tương tự, trên một chuỗi khác, qua Clojure future. Người quan sát chính xác không chụp tất cả các sự kiện được đăng lên onNext; nó dường như mất một số lượng tin nhắn ngẫu nhiên ở đuôi.

Có một cuộc đua cố ý sau đây khi hết hạn chờ đợi promise d onCompleted và hết hạn chờ đợi cho tất cả các sự kiện được gửi đến một nhà sưu tập agent. Nếu chiến thắng promise, tôi hy vọng sẽ thấy false cho onCompleted và hàng đợi có thể ngắn trong agent. Nếu số agent thắng, tôi hy vọng sẽ thấy true cho onCompleted và tất cả thư từ hàng đợi của agent. Một kết quả tôi KHÔNG mong đợi là true cho onCompleted VÀ một hàng đợi ngắn từ agent. Nhưng, Murphy không ngủ, và đó là chính xác những gì tôi thấy. Tôi không biết liệu bộ sưu tập rác có bị lỗi hay không, hoặc một số hàng đợi bên trong để STM của Clojure, hoặc sự ngu xuẩn của tôi, hoặc một thứ gì đó hoàn toàn khác.

Tôi trình bày nguồn theo thứ tự của biểu mẫu khép kín, tại đây, để nó có thể được chạy trực tiếp qua lein repl. Có ba cermonials để có được ra khỏi con đường: thứ nhất, hồ sơ dự án leiningen, project.clj, mà tuyên bố sự phụ thuộc vào phiên bản 0.9.0 của rxjava của Netflix:

(defproject expt2 "0.1.0-SNAPSHOT" 
    :description "FIXME: write description" 
    :url "http://example.com/FIXME" 
    :license {:name "Eclipse Public License" 
      :url "http://www.eclipse.org/legal/epl-v10.html"} 
    :dependencies [[org.clojure/clojure    "1.5.1"] 
       [com.netflix.rxjava/rxjava-clojure "0.9.0"]] 
    :main expt2.core) 

Bây giờ, không gian tên và một yêu cầu Clojure và nhập khẩu Java :

(ns expt2.core 
    (:require clojure.pprint) 
    (:refer-clojure :exclude [distinct]) 
    (:import [rx Observable subscriptions.Subscriptions])) 

Cuối cùng, một macro cho đầu ra ra cửa sổ console:

(defmacro pdump [x] 
    `(let [x# ~x] 
    (do (println "----------------") 
     (clojure.pprint/pprint '~x) 
     (println "~~>") 
     (clojure.pprint/pprint x#) 
     (println "----------------") 
     x#))) 

Cuối cùng, để quan sát tôi. Tôi sử dụng một số agent để thu thập các tin nhắn được gửi bởi bất kỳ onNext quan sát nào. Tôi sử dụng số atom để thu thập số điện thoại onError. Tôi sử dụng một số promise cho số onCompleted để người tiêu dùng bên ngoài với người quan sát có thể chờ đợi.

(defn- subscribe-collectors [obl] 
    (let [;; Keep a sequence of all values sent: 
     onNextCollector  (agent []) 
     ;; Only need one value if the observable errors out: 
     onErrorCollector  (atom nil) 
     ;; Use a promise for 'completed' so we can wait for it on 
     ;; another thread: 
     onCompletedCollector (promise)] 
    (letfn [;; When observable sends a value, relay it to our agent" 
      (collect-next  [item] (send onNextCollector (fn [state] (conj state item)))) 
      ;; If observable errors out, just set our exception; 
      (collect-error  [excp] (reset! onErrorCollector  excp)) 
      ;; When observable completes, deliver on the promise: 
      (collect-completed [ ] (deliver onCompletedCollector true)) 
      ;; In all cases, report out the back end with this: 
      (report-collectors [ ] 
       (pdump 
       ;; Wait for everything that has been sent to the agent 
       ;; to drain (presumably internal message queues): 
       {:onNext  (do (await-for 1000 onNextCollector) 
           ;; Then produce the results: 
           @onNextCollector) 
       ;; If we ever saw an error, here it is: 
       :onError  @onErrorCollector 
       ;; Wait at most 1 second for the promise to complete; 
       ;; if it does not complete, then produce 'false'. 
       ;; I expect if this times out before the agent 
       ;; times out to see an 'onCompleted' of 'false'. 
       :onCompleted (deref onCompletedCollector 1000 false) 
       }))] 
     ;; Recognize that the observable 'obl' may run on another thread: 
     (-> obl 
      (.subscribe collect-next collect-error collect-completed)) 
     ;; Therefore, produce results that wait, with timeouts, on both 
     ;; the completion event and on the draining of the (presumed) 
     ;; message queue to the agent. 
     (report-collectors)))) 

Bây giờ, đây là đồng bộ quan sát được. Nó bơm 25 tin nhắn xuống onNext cổ họng của các quan sát viên của nó, sau đó gọi số onCompleted của họ.

(defn- customObservableBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method. 
     ;; Send 25 strings to the observer's onNext: 
     (doseq [x (range 25)] 
     (-> observer (.onNext (str "SynchedValue_" x)))) 
     ; After sending all values, complete the sequence: 
     (-> observer .onCompleted) 
     ; return a NoOpSubsription since this blocks and thus 
     ; can't be unsubscribed (disposed): 
     (Subscriptions/empty)))) 

Chúng tôi đăng ký quan sát của chúng tôi để có thể quan sát này:

;;; The value of the following is the list of all 25 events: 
(-> (customObservableBlocking) 
    (subscribe-collectors)) 

Nó hoạt động như mong đợi, và chúng ta thấy kết quả như sau trên console

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["SynchedValue_0" 
    "SynchedValue_1" 
    "SynchedValue_2" 
    "SynchedValue_3" 
    "SynchedValue_4" 
    "SynchedValue_5" 
    "SynchedValue_6" 
    "SynchedValue_7" 
    "SynchedValue_8" 
    "SynchedValue_9" 
    "SynchedValue_10" 
    "SynchedValue_11" 
    "SynchedValue_12" 
    "SynchedValue_13" 
    "SynchedValue_14" 
    "SynchedValue_15" 
    "SynchedValue_16" 
    "SynchedValue_17" 
    "SynchedValue_18" 
    "SynchedValue_19" 
    "SynchedValue_20" 
    "SynchedValue_21" 
    "SynchedValue_22" 
    "SynchedValue_23" 
    "SynchedValue_24"], 
:onError nil, 
:onCompleted true} 
---------------- 

Dưới đây là một không đồng bộ có thể quan sát mà không chính xác điều tương tự, chỉ trên chủ đề của future:

(defn- customObservableNonBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method 
     (let [f (future 
       ;; On another thread, send 25 strings: 
       (doseq [x (range 25)] 
        (-> observer (.onNext (str "AsynchValue_" x)))) 
       ; After sending all values, complete the sequence: 
       (-> observer .onCompleted))] 
     ; Return a disposable (unsubscribe) that cancels the future: 
     (Subscriptions/create #(future-cancel f)))))) 

;;; For unknown reasons, the following does not produce all 25 events: 
(-> (customObservableNonBlocking) 
    (subscribe-collectors)) 

Nhưng, bất ngờ, đây là những gì chúng ta thấy trên bảng điều khiển: true cho onCompleted, ngụ ý rằng promise KHÔNG PHẢI GIỜ; nhưng chỉ một số thông điệp asynch. Số lượng tin nhắn thực tế chúng ta thấy thay đổi từ chạy đến chạy, ngụ ý rằng có một số hiện tượng đồng thời lúc chơi. Các manh mối được đánh giá cao.

---------------- 
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["AsynchValue_0" 
    "AsynchValue_1" 
    "AsynchValue_2" 
    "AsynchValue_3" 
    "AsynchValue_4" 
    "AsynchValue_5" 
    "AsynchValue_6"], 
:onError nil, 
:onCompleted true} 
---------------- 

Trả lời

7

Các await-for trên đại lý có nghĩa Blocks thread hiện hành cho đến khi tất cả các hành động cử do đó xa (từ chủ đề này hoặc đại lý) cho các đại lý đã xảy ra, có nghĩa là nó có thể xảy ra sau khi chờ đợi của bạn là ở đó vẫn còn một số chủ đề khác có thể gửi tin nhắn cho các đại lý và đó là những gì đang xảy ra trong trường hợp của bạn. Sau khi bạn đang chờ đại lý kết thúc và bạn đã đặt giá trị của nó trong khóa :onNext trong bản đồ, sau đó bạn đợi lời hứa đã hoàn thành sau khi chờ, nhưng trong thời gian chờ đợi, một số tin nhắn khác được gửi đến đại lý được thu thập vào vectơ.

Bạn có thể giải quyết vấn đề này bằng phím :onCompleted làm khóa đầu tiên trên bản đồ, về cơ bản có nghĩa là chờ hoàn thành và sau đó đợi cho các đại lý coz vào thời điểm đó không còn có thể gọi số send. đã nhận được onCompleted.

{:onCompleted (deref onCompletedCollector 1000 false) 
:onNext  (do (await-for 0 onNextCollector) 
           @onNextCollector) 
:onError  @onErrorCollector 
} 
+0

Đã xác minh và kiểm tra. –