Khi nào thì sử dụng chúng
Nếu bạn cần nhiều hơn hai điểm để giao tiếp, sử dụng một Queue()
.
Nếu bạn cần hiệu suất tuyệt đối, Pipe()
nhanh hơn nhiều vì Queue()
được xây dựng trên đầu trang của Pipe()
.
Hiệu suất điểm chuẩn
Giả sử bạn muốn đẻ trứng hai quá trình và gửi tin nhắn giữa chúng càng nhanh càng tốt. Đây là kết quả thời gian của cuộc đua kéo giữa các thử nghiệm tương tự sử dụng Pipe()
và Queue()
... Đây là trên ThinkpadT61 chạy Ubuntu 11.10 và Python 2.7.2.
FYI, tôi đã ném kết quả cho JoinableQueue()
làm tiền thưởng; JoinableQueue()
tài khoản cho các nhiệm vụ khi queue.task_done()
được gọi (thậm chí không biết về nhiệm vụ cụ thể, nó chỉ đếm các công việc chưa hoàn thành trong hàng đợi), để queue.join()
biết công việc đã hoàn tất.
Mã cho từng ở phía dưới của câu trả lời này ...
[email protected]:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
[email protected]:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
[email protected]:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
[email protected]:~$
Nói tóm lại Pipe()
là nhanh hơn so với một Queue()
khoảng ba lần. Thậm chí không suy nghĩ về các JoinableQueue()
trừ khi bạn thực sự phải có những lợi ích.
THƯỞNG TÀI LIỆU 2
Đa giới thiệu những thay đổi tinh tế trong dòng chảy thông tin mà làm cho gỡ lỗi khó trừ khi bạn biết một số phím tắt. Ví dụ, bạn có thể có một kịch bản hoạt động tốt khi lập chỉ mục thông qua một từ điển trong nhiều điều kiện, nhưng thỉnh thoảng không thành công với một số đầu vào nhất định.
Thông thường chúng tôi nhận được manh mối về sự thất bại khi toàn bộ quá trình python bị treo; tuy nhiên, bạn không nhận được các dấu vết sự cố không mong muốn được in trên bảng điều khiển nếu chức năng đa xử lý bị treo. Theo dõi xuống các lỗi treo đa xử lý không xác định là khó mà không có manh mối về những gì đã xảy ra.
Cách đơn giản nhất tôi đã tìm thấy để theo dõi đa xử vụ tai nạn informaiton là để bọc toàn bộ chức năng đa xử lý trong một try
/except
và sử dụng traceback.print_exc()
:
import traceback
def reader(args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
Bây giờ, khi bạn tìm thấy một vụ tai nạn mà bạn nhìn thấy cái gì đó như :
FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(task_q, result_q)
File "foo.py", line 46, in run
raise ValueError
ValueError
Source Code:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader(pipe):
output_p, input_p = pipe
input_p.close() # We are only reading
while True:
try:
msg = output_p.recv() # Read from the output pipe and do nothing
except EOFError:
break
def writer(count, input_p):
for ii in xrange(0, count):
input_p.send(ii) # Write 'count' numbers into the input pipe
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
output_p, input_p = Pipe()
reader_p = Process(target=reader, args=((output_p, input_p),))
reader_p.start() # Launch the reader process
output_p.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, input_p) # Send a lot of stuff to reader()
input_p.close() # Ask the reader to stop when it reads EOF
reader_p.join()
print "Sending %s numbers to Pipe() took %s seconds" % (count,
(time.time() - _start))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
def reader(queue):
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = Queue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print "Sending %s numbers to Queue() took %s seconds" % (count,
(time.time() - _start))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader(queue):
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = JoinableQueue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
queue.join() # Wait for the reader to finish
print "Sending %s numbers to JoinableQueue() took %s seconds" % (count,
(time.time() - _start))
@ Jonathan "Trong ống tóm tắt() nhanh hơn khoảng ba lần một Queue()" –
Nhưng Pipe() có thể không an toàn được sử dụng với nhiều nhà sản xuất/người tiêu dùng. –
Tuyệt vời! Tốt câu trả lời và tốt đẹp mà bạn cung cấp các tiêu chuẩn! Tôi chỉ có hai câu hỏi nhỏ: (1) "đơn đặt hàng của cường độ nhanh hơn" là một chút quá mức. Sự khác biệt là x3, khoảng một phần ba của một bậc độ lớn. Chỉ cần nói. ;-); và (2) một so sánh công bằng hơn sẽ chạy N công nhân, mỗi giao tiếp với chủ đề chính thông qua điểm-to-điểm ống so với hiệu suất của chạy N công nhân tất cả kéo từ một đơn điểm-to-đa hàng đợi. – JJC