2012-01-21 22 views
13

tôi nhận thấy rằng một ổ cắm PUB zeromq sẽ đệm tất cả các dữ liệu gửi đi nếu nó là kết nối, ví dụZeroMQ PUB ổ cắm đệm tất cả ra tôi sẽ dữ liệu khi nó được kết nối

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.connect("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.bind("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print sub.recv() 

Tiểu liên kết sau những thông điệp, họ nên được loại bỏ, bởi vì PUB sẽ thả tin nhắn nếu không có ai kết nối với nó. Nhưng thay vì bỏ thư, nó sẽ lưu tất cả thư.

a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
hi 

Như bạn thấy, những "tin nhắn không nên bỏ" được đệm bởi ổ cắm, sau khi được kết nối, nó sẽ đưa chúng vào ổ cắm SUB. Nếu tôi kết nối vào ổ cắm PUB, và kết nối tại ổ cắm SUB, thì nó hoạt động chính xác.

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.bind("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.connect("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print repr(sub.recv()) 

Và bạn chỉ có thể nhìn thấy đầu ra

'hi' 

Loại hành vi này lạ gây ra một vấn đề, nó đệm tất cả dữ liệu trên một ổ cắm kết nối, tôi có hai máy chủ, máy chủ A công bố dữ liệu đến máy chủ B

Server A -- publish --> Server B 

Nó hoạt động tốt nếu máy chủ B trực tuyến. Nhưng nếu tôi khởi động Máy chủ A và không khởi động Máy chủ B thì sao?

Kết quả là, ổ cắm kết nối PUB trên Máy chủ A giữ tất cả dữ liệu đó, việc sử dụng bộ nhớ ngày càng cao.

Đây là vấn đề, loại hành vi này là lỗi hoặc tính năng? Nếu đó là tính năng, tôi có thể tìm tài liệu đề cập đến hành vi này ở đâu? Và làm thế nào tôi có thể ngừng kết nối ổ cắm PUB tất cả dữ liệu?

Cảm ơn.

Trả lời

6

Cho dù các khối ổ cắm hoặc giảm thông điệp phụ thuộc vào loại ổ cắm như mô tả trong ZMQ::Socket documentation (nhấn mạnh dưới đây là của tôi):

ZMQ :: HWM: Lấy dấu nước cao

Các ZMQ: : Tùy chọn HWM sẽ lấy dấu nước cao cho ổ cắm được chỉ định . Dấu hiệu nước cao là một giới hạn cứng trên số lượng tối đa số lượng tin nhắn chưa trả 0MQ sẽ xếp hàng trong bộ nhớ cho bất kỳ máy ngang hàng đơn lẻ nào mà cổng được chỉ định đang kết nối với bộ nhớ .

Nếu giới hạn này đã đạt đến ổ cắm, hãy nhập trạng thái đặc biệt và tùy thuộc vào loại socket, 0MQ sẽ thực hiện hành động thích hợp như chặn hoặc thả tin nhắn đã gửi. Tham khảo mô tả ổ cắm riêng lẻ trong ZMQ :: Ổ cắm để biết chi tiết về chính xác hành động được thực hiện cho từng loại ổ cắm.

Giá trị ZMQ :: HWM mặc định bằng 0 có nghĩa là "không có giới hạn".

Bạn có thể xem nếu nó sẽ chặn hoặc thả bằng cách xem qua các tài liệu cho các loại ổ cắm cho ZMQ::HWM option action mà một trong hai sẽ Block hoặc Drop.

Hành động cho ZMQ::PUBDrop, vì vậy nếu nó không bị giảm, bạn nên kiểm tra HWM (High Water Mark) giá trị và chú ý đến cảnh báo rằng Giá trị mặc định giá trị ZMQ :: HWM của zero có nghĩa là “không có giới hạn”, có nghĩa là nó sẽ không đi vào trạng thái ngoại lệ cho đến khi hệ thống hết bộ nhớ (tại thời điểm đó tôi không biết nó hoạt động như thế nào).

+0

Tôi biết tôi có thể đặt HWM để giới hạn số thư trong bộ đệm. Nhưng nó không giải quyết vấn đề, họ cách PUB xử lý HWM nhà nước là để thả tin nhắn mới. Nó có nghĩa là nếu bạn thiết lập HWM, chỉ có các tin nhắn hàng đầu được lưu giữ trong bộ đệm. Những gì tôi đang viết là hệ thống phát trực tuyến âm thanh. Loại hành vi này làm cho nó rất khó chịu khi sử dụng. Giả sử, bạn gửi tin nhắn [1, 2, 3, 4], và sau đó HWM được đặt thành 2, sau đó socket sẽ đệm [1, 2] cho bạn, tất cả các tin nhắn mới sẽ bị xóa. Nhưng đối với phát trực tuyến âm thanh, phần quan trọng nhất là dữ liệu mới đến. Có cách nào để điều chỉnh cách HWM giảm tin nhắn không? –

+0

Ah, do đó, bạn có nghĩa là hành vi bạn muốn là nếu HWM được đặt thành 2 và bạn gửi [1, 2, 3, 4] thì nó sẽ thả [1, 2] và giữ [3, 4], nhưng sau đó nếu bạn gửi 5 nó sẽ thả 3 và bạn kết thúc với [4, 5]? Tôi không nghĩ rằng hành vi đó tồn tại trong ZMQ. – aculich

+0

Điều này rất thú vị. Chắc chắn có khả năng để thả "cũ" tin nhắn sẽ là cần thiết cho một số ứng dụng (điện thoại IP đến với tâm trí như là một ví dụ phổ biến). –

0

Vì vậy, ràng buộc() và kết nối() dẫn đến hai hành vi khác nhau. Tại sao bạn không chỉ chọn cái nào bạn thích (nó có vẻ như bind()) và sử dụng nó?

Nó thực sự là một tính năng của ZeroMQ nói chung rằng nó đệm các thư gửi đi cho đến khi kết nối được thực hiện.

+0

Vì tôi có nhiều nút muốn xuất bản dữ liệu lên một máy chủ nổi tiếng. Tất nhiên tôi có thể ràng buộc ở phía PUB, nhưng kết quả là, tôi cần N địa chỉ cho mỗi nút, máy chủ không biết có bao nhiêu nút sẽ có. Tôi nghĩ rằng ràng buộc và kết nối không nên ảnh hưởng đến hành vi, một khi kết nối được thực hiện, không có sự khác biệt giữa ràng buộc và kết nối, sau đó tại sao làm cho sự khác biệt? Tôi không hiểu: S –

+0

Oh OK. Tôi nghĩ ZeroMQ hoạt động như mong đợi và được thiết kế, vì vậy bạn có thể phải truy vấn kết nối trước khi gửi dữ liệu. –

+0

@JohnZwinck Chọn 'bind()' vs 'connect()' không dựa trên sở thích, nhưng thay vào đó nên dựa trên cách nó được sử dụng. Ông đang sử dụng nó một cách chính xác với 'bind()' trên máy chủ (nhà xuất bản) và 'connect()' trên máy khách (người đăng ký). Và nó không phải lúc nào cũng đệm các thư gửi đi, mà thay vào đó nó được xác định bởi kiểu socket và giá trị của dấu nước cao như [được giải thích ở đây với các tham chiếu đến tài liệu] (http://stackoverflow.com/a/8958699/462302). – aculich

0

Bạn sẽ có thể đặt dấu nước cao trong ổ cắm bằng cách sử dụng hwm settingom the pub socket. Nó cho phép bạn xác định số lượng thư được lưu giữ.

1

Chúng đặt tùy chọn HWM trên ổ cắm.

4

Tôi cảm thấy hành vi này là ngữ nghĩa của zmq_connect(). Đó là: khi zmq_connect() trả về thành công, khi đó kết nối được thiết lập khái niệm và do đó kết nối PUB của bạn bắt đầu gửi hàng đợi thay vì giảm.

Tiếp theo đoạn trích từ "ZMQ Guide" là một gợi ý cho việc này:

Về lý thuyết với ổ cắm ØMQ, nó không quan trọng mà cuối kết nối, và mà cuối liên kết. Tuy nhiên với ổ cắm PUB-SUB, nếu bạn kết nối ổ cắm SUB và kết nối với ổ cắm PUB, ổ cắm SUB có thể nhận được thông báo cũ , tức là tin nhắn được gửi trước khi SUB khởi động. Đây là một hiện vật cách liên kết/kết nối hoạt động. Tốt nhất là ràng buộc PUB và kết nối SUB, nếu có thể.

Tiếp theo phần trong zmq_connect() có một số gợi ý, hiển thị dưới đây:

khác biệt chủ chốt để ổ cắm thông thường

Nói chung, ổ cắm thông thường trình bày một giao diện đồng bộ cho một trong hai hướng kết nối đáng tin cậy các luồng byte (SOCK_STREAM) hoặc các datagram không đáng tin cậy kết nối (SOCK_DGRAM). Để so sánh, các ổ cắm ØMQ thể hiện sự trừu tượng của một hàng đợi tin nhắn không đồng bộ , với ngữ nghĩa xếp hàng chính xác tùy thuộc vào loại ổ cắm đang sử dụng. Nơi các ổ cắm thông thường truyền các luồng của byte hoặc các gói dữ liệu rời rạc, các cổng ØMQ truyền các thông điệp rời rạc.

socket ØMQ là không đồng bộ có nghĩa là timings của việc thiết lập kết nối vật lý và xé xuống, kết nối lại và phân phối hiệu quả là trong suốt đối với người sử dụng và bởi ØMQ tự tổ chức. Hơn nữa, tin nhắn có thể được xếp hàng đợi trong trường hợp một người ngang hàng không có mặt để nhận chúng.

0

Dưới đây là một hack mà có thể giúp đỡ ...

Đặt bạn ZMQ::HWM đến một số cố định, nói 10.Khi kết nối, hãy gọi phương thức recv của ổ cắm thuê bao trong một vòng lặp cho đến khi nó loại bỏ tất cả các tin nhắn được đệm, và chỉ THEN bắt đầu vòng lặp nhận chính của bạn.