Tôi xin lỗi trước về độ dài của ghi chú này. Tôi đã dành thời gian đáng kể làm cho nó ngắn hơn, và điều này là nhỏ như tôi có thể nhận được nó.rxjava và bí mật không đồng bộ clojure: tương lai hứa hẹn và đại lý, oh của tôi
Tôi có bí ẩn và sẽ biết ơn sự giúp đỡ của bạn. Bí ẩn này xuất phát từ hành vi của một rxjava observer
Tôi đã viết trong Clojure qua một vài nếp gấp đơn giản của observable
s từ các mẫu trực tuyến.
Một quan sát có thể đồng bộ gửi tin nhắn tới các trình xử lý của các quan sát viên của mình và người quan sát được cho là nguyên tắc của tôi hoạt động như mong đợi.
Các quan sát không đồng bộ khác thực hiện tương tự, trên một chuỗi khác, qua Clojure future
. Người quan sát chính xác không chụp tất cả các sự kiện được đăng lên onNext
; nó dường như mất một số lượng tin nhắn ngẫu nhiên ở đuôi.
Có một cuộc đua cố ý sau đây khi hết hạn chờ đợi promise
d onCompleted
và hết hạn chờ đợi cho tất cả các sự kiện được gửi đến một nhà sưu tập agent
. Nếu chiến thắng promise
, tôi hy vọng sẽ thấy false
cho onCompleted
và hàng đợi có thể ngắn trong agent
. Nếu số agent
thắng, tôi hy vọng sẽ thấy true
cho onCompleted
và tất cả thư từ hàng đợi của agent
. Một kết quả tôi KHÔNG mong đợi là true
cho onCompleted
VÀ một hàng đợi ngắn từ agent
. Nhưng, Murphy không ngủ, và đó là chính xác những gì tôi thấy. Tôi không biết liệu bộ sưu tập rác có bị lỗi hay không, hoặc một số hàng đợi bên trong để STM của Clojure, hoặc sự ngu xuẩn của tôi, hoặc một thứ gì đó hoàn toàn khác.
Tôi trình bày nguồn theo thứ tự của biểu mẫu khép kín, tại đây, để nó có thể được chạy trực tiếp qua lein repl
. Có ba cermonials để có được ra khỏi con đường: thứ nhất, hồ sơ dự án leiningen, project.clj
, mà tuyên bố sự phụ thuộc vào phiên bản 0.9.0
của rxjava của Netflix:
(defproject expt2 "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[com.netflix.rxjava/rxjava-clojure "0.9.0"]]
:main expt2.core)
Bây giờ, không gian tên và một yêu cầu Clojure và nhập khẩu Java :
(ns expt2.core
(:require clojure.pprint)
(:refer-clojure :exclude [distinct])
(:import [rx Observable subscriptions.Subscriptions]))
Cuối cùng, một macro cho đầu ra ra cửa sổ console:
(defmacro pdump [x]
`(let [x# ~x]
(do (println "----------------")
(clojure.pprint/pprint '~x)
(println "~~>")
(clojure.pprint/pprint x#)
(println "----------------")
x#)))
Cuối cùng, để quan sát tôi. Tôi sử dụng một số agent
để thu thập các tin nhắn được gửi bởi bất kỳ onNext
quan sát nào. Tôi sử dụng số atom
để thu thập số điện thoại onError
. Tôi sử dụng một số promise
cho số onCompleted
để người tiêu dùng bên ngoài với người quan sát có thể chờ đợi.
(defn- subscribe-collectors [obl]
(let [;; Keep a sequence of all values sent:
onNextCollector (agent [])
;; Only need one value if the observable errors out:
onErrorCollector (atom nil)
;; Use a promise for 'completed' so we can wait for it on
;; another thread:
onCompletedCollector (promise)]
(letfn [;; When observable sends a value, relay it to our agent"
(collect-next [item] (send onNextCollector (fn [state] (conj state item))))
;; If observable errors out, just set our exception;
(collect-error [excp] (reset! onErrorCollector excp))
;; When observable completes, deliver on the promise:
(collect-completed [ ] (deliver onCompletedCollector true))
;; In all cases, report out the back end with this:
(report-collectors [ ]
(pdump
;; Wait for everything that has been sent to the agent
;; to drain (presumably internal message queues):
{:onNext (do (await-for 1000 onNextCollector)
;; Then produce the results:
@onNextCollector)
;; If we ever saw an error, here it is:
:onError @onErrorCollector
;; Wait at most 1 second for the promise to complete;
;; if it does not complete, then produce 'false'.
;; I expect if this times out before the agent
;; times out to see an 'onCompleted' of 'false'.
:onCompleted (deref onCompletedCollector 1000 false)
}))]
;; Recognize that the observable 'obl' may run on another thread:
(-> obl
(.subscribe collect-next collect-error collect-completed))
;; Therefore, produce results that wait, with timeouts, on both
;; the completion event and on the draining of the (presumed)
;; message queue to the agent.
(report-collectors))))
Bây giờ, đây là đồng bộ quan sát được. Nó bơm 25 tin nhắn xuống onNext
cổ họng của các quan sát viên của nó, sau đó gọi số onCompleted
của họ.
(defn- customObservableBlocking []
(Observable/create
(fn [observer] ; This is the 'subscribe' method.
;; Send 25 strings to the observer's onNext:
(doseq [x (range 25)]
(-> observer (.onNext (str "SynchedValue_" x))))
; After sending all values, complete the sequence:
(-> observer .onCompleted)
; return a NoOpSubsription since this blocks and thus
; can't be unsubscribed (disposed):
(Subscriptions/empty))))
Chúng tôi đăng ký quan sát của chúng tôi để có thể quan sát này:
;;; The value of the following is the list of all 25 events:
(-> (customObservableBlocking)
(subscribe-collectors))
Nó hoạt động như mong đợi, và chúng ta thấy kết quả như sau trên console
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["SynchedValue_0"
"SynchedValue_1"
"SynchedValue_2"
"SynchedValue_3"
"SynchedValue_4"
"SynchedValue_5"
"SynchedValue_6"
"SynchedValue_7"
"SynchedValue_8"
"SynchedValue_9"
"SynchedValue_10"
"SynchedValue_11"
"SynchedValue_12"
"SynchedValue_13"
"SynchedValue_14"
"SynchedValue_15"
"SynchedValue_16"
"SynchedValue_17"
"SynchedValue_18"
"SynchedValue_19"
"SynchedValue_20"
"SynchedValue_21"
"SynchedValue_22"
"SynchedValue_23"
"SynchedValue_24"],
:onError nil,
:onCompleted true}
----------------
Dưới đây là một không đồng bộ có thể quan sát mà không chính xác điều tương tự, chỉ trên chủ đề của future
:
(defn- customObservableNonBlocking []
(Observable/create
(fn [observer] ; This is the 'subscribe' method
(let [f (future
;; On another thread, send 25 strings:
(doseq [x (range 25)]
(-> observer (.onNext (str "AsynchValue_" x))))
; After sending all values, complete the sequence:
(-> observer .onCompleted))]
; Return a disposable (unsubscribe) that cancels the future:
(Subscriptions/create #(future-cancel f))))))
;;; For unknown reasons, the following does not produce all 25 events:
(-> (customObservableNonBlocking)
(subscribe-collectors))
Nhưng, bất ngờ, đây là những gì chúng ta thấy trên bảng điều khiển: true
cho onCompleted
, ngụ ý rằng promise
KHÔNG PHẢI GIỜ; nhưng chỉ một số thông điệp asynch. Số lượng tin nhắn thực tế chúng ta thấy thay đổi từ chạy đến chạy, ngụ ý rằng có một số hiện tượng đồng thời lúc chơi. Các manh mối được đánh giá cao.
----------------
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["AsynchValue_0"
"AsynchValue_1"
"AsynchValue_2"
"AsynchValue_3"
"AsynchValue_4"
"AsynchValue_5"
"AsynchValue_6"],
:onError nil,
:onCompleted true}
----------------
Đã xác minh và kiểm tra. –