Khi bạn cần phải lắng nghe trên ổ cắm khác nhau trong cùng một thread, sử dụng một poller:
ZMQ.Socket subscriber = ctx.socket(ZMQ.SUB)
ZMQ.Socket puller = ctx.socket(ZMQ.PULL)
socket ký với poller (POLLIN
lắng nghe cho các tin nhắn đến)
ZMQ.Poller poller = ZMQ.Poller(2)
poller.register(subscriber, ZMQ.Poller.POLLIN)
poller.register(puller, ZMQ.Poller.POLLIN)
Khi bỏ phiếu, sử dụng vòng lặp:
while(notInterrupted()){
poller.poll()
//subscriber registered at index '0'
if(poller.pollin(0))
subscriber.recv(ZMQ.DONTWAIT)
//puller registered at index '1'
if(poller.pollin(1))
puller.recv(ZMQ.DONTWAIT)
}
Chọn cách bạn muốn thăm dò ý kiến ...
poller.poll()
khối cho đến khi có dữ liệu trên ổ cắm.
poller.poll(1000)
khối trong 1 giây, sau đó hết giờ.
Poller thông báo khi có dữ liệu (tin nhắn) có sẵn trên ổ cắm; đó là công việc của bạn để đọc nó.
Khi đọc, hãy thực hiện mà không chặn: socket.recv(ZMQ.DONTWAIT)
. Mặc dù poller.pollin(0)
kiểm tra xem có dữ liệu để đọc hay không, bạn muốn tránh bất kỳ cuộc gọi chặn nào bên trong vòng lặp bỏ phiếu, nếu không, bạn có thể sẽ chặn poller do ổ cắm bị kẹt.
Vì vậy, nếu hai thông điệp riêng biệt được gửi đến subscriber
, bạn phải gọi subscriber.recv()
hai lần để xóa poller, nếu không, nếu bạn gọi subscriber.recv()
một lần, các poller sẽ luôn nói với bạn có một thông báo để được đọc. Vì vậy, về bản chất, các poller theo dõi sự sẵn có và số lượng tin nhắn, không phải là thông điệp thực tế.
Bạn nên chạy qua các ví dụ bỏ phiếu và chơi với mã, đó là cách tốt nhất để tìm hiểu.
Điều đó có trả lời câu hỏi của bạn không?
Bạn có thể nêu rõ hơn những câu hỏi cụ thể của mình không? – xaxxon