Tôi đang cố gắng tích hợp Storm (see here) vào dự án của mình. Tôi mò mẫm các khái niệm về topo, vòi và bu lông. Nhưng bây giờ, tôi đang cố gắng tìm ra việc thực hiện thực tế của một vài điều.Bão> Cách tích hợp cuộc gọi lại Java vào Spout
A) Tôi có môi trường đa ngôn ngữ với Java và Clojure. Mã Java của tôi là một lớp gọi lại với các phương thức kích hoạt dữ liệu trực tuyến. Dữ liệu sự kiện được đẩy vào các phương thức đó, là những gì tôi muốn sử dụng như một vòi.
Vì vậy, câu hỏi đầu tiên là làm thế nào để kết nối dữ liệu vào các phương pháp đó, với một vòi? Tôi đang cố gắng để i) vượt qua một backtype.storm.topology.IRichSpout, sau đó ii) vượt qua một backtype.storm.spout.SpoutOutputCollector (see here) để mở chức năng của vòi đó (see here). Nhưng tôi không thể nhìn thấy một cách để thực sự vượt qua trong bất kỳ loại bản đồ hoặc danh sách.
B) Phần còn lại của dự án của tôi là tất cả Clojure. Sẽ có rất nhiều dữ liệu đi qua các phương pháp đó. Mỗi sự kiện sẽ có một ID từ 1 đến 100. Trong Clojure, tôi sẽ muốn tách dữ liệu đến từ vòi, thành các luồng thực thi khác nhau. Những người đó, tôi nghĩ, sẽ là bu lông.
Làm cách nào tôi có thể thiết lập chốt Clojure để lấy dữ liệu sự kiện từ vòi, sau đó ngắt một chuỗi dựa trên ID của sự kiện đến?
Cảm ơn trước Tim
[EDIT 1]
Tôi đã thực sự nhận được qua vấn đề này. Tôi đã kết thúc 1) triển khai IRichSpout của riêng tôi. Sau đó tôi 2) kết nối bộ nội bộ của vòi đó với dữ liệu luồng đến trong lớp gọi lại java của tôi. Tôi không chắc đây có phải là thành ngữ hay không. Nhưng nó biên dịch và chạy mà không có lỗi. Tuy nhiên, 3) Tôi không thấy dữ liệu luồng đến (chắc chắn có), đi qua printstuff bolt.
Để đảm bảo rằng dữ liệu sự kiện được truyền bá, có điều gì đó cụ thể mà tôi phải thực hiện trong quá trình thực hiện vòi hoặc bu lông hoặc định nghĩa cấu trúc liên kết không? Cảm ơn.
;; tie Java callbacks to a Spout that I created (.setSpout java-callback ibspout) (storm/defbolt printstuff ["word"] [tuple collector] (println (str "printstuff --> tuple["tuple"] > collector["collector"]")) ) (storm/topology { "1" (storm/spout-spec ibspout) } { "3" (storm/bolt-spec { "1" :shuffle } printstuff ) })
[EDIT 2]
Theo lời khuyên của SO viên Ankur, tôi rejigging topo của tôi. Sau khi tôi đã tạo cuộc gọi lại Java của mình, tôi chuyển nó tới bộ IBSpout bên dưới, sử dụng (.setTuple ibspout (.getTuple java-callback))
. Tôi không vượt qua toàn bộ đối tượng gọi lại Java, bởi vì tôi nhận được một lỗi NotSerializable. Mọi thứ biên dịch và chạy mà không có lỗi. Nhưng một lần nữa, không có dữ liệu đến instuff bolt của tôi. Hừm.
public class IBSpout implements IRichSpout { /** * Storm spout stuff */ private SpoutOutputCollector _collector; private List _tuple = new ArrayList(); public void setTuple(List tuple) { _tuple = tuple; } public List getTuple() { return _tuple; } /** * Storm ISpout interface functions */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() {} public void activate() {} public void deactivate() {} public void nextTuple() { _collector.emit(_tuple); } public void ack(Object msgId) {} public void fail(Object msgId) {} public void declareOutputFields(OutputFieldsDeclarer declarer) {} public java.util.Map getComponentConfiguration() { return new HashMap(); } }
Xin chào, cảm ơn vì đã xem xét điều này. Tôi thực sự đã chỉ định một **: shuffle ** để cân bằng khối lượng công việc. Vấn đề tôi đang gặp phải là tôi không thấy dữ liệu sự kiện của mình được truyền tới bu-lông của tôi (xem chỉnh sửa abouve). Bất kỳ cái nhìn sâu sắc ở đó, được đánh giá cao. – Nutritioustim
@Nutritioustim bạn có thực sự tìm ra vấn đề là gì không? – Vor
@Vor, Không. Bão có vẻ hơi quá khó chịu đối với những gì tôi đang cố gắng làm. Bây giờ *** [Lamina] (https://github.com/ztellman/lamina) *** đáp ứng nhu cầu của tôi. HTH. – Nutritioustim