2011-12-11 24 views

Trả lời

180
  • A Pipe() chỉ có thể có hai điểm cuối.

  • A Queue() có thể có nhiều nhà sản xuất và người tiêu dùng.

Khi nào thì sử dụng chúng

Nếu bạn cần nhiều hơn hai điểm để giao tiếp, sử dụng một Queue().

Nếu bạn cần hiệu suất tuyệt đối, Pipe() nhanh hơn nhiều vì Queue() được xây dựng trên đầu trang của Pipe().

Hiệu suất điểm chuẩn

Giả sử bạn muốn đẻ trứng hai quá trình và gửi tin nhắn giữa chúng càng nhanh càng tốt. Đây là kết quả thời gian của cuộc đua kéo giữa các thử nghiệm tương tự sử dụng Pipe()Queue() ... Đây là trên ThinkpadT61 chạy Ubuntu 11.10 và Python 2.7.2.

FYI, tôi đã ném kết quả cho JoinableQueue() làm tiền thưởng; JoinableQueue() tài khoản cho các nhiệm vụ khi queue.task_done() được gọi (thậm chí không biết về nhiệm vụ cụ thể, nó chỉ đếm các công việc chưa hoàn thành trong hàng đợi), để queue.join() biết công việc đã hoàn tất.

Mã cho từng ở phía dưới của câu trả lời này ...

[email protected]:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds 
Sending 100000 numbers to Pipe() took 0.328398942947 seconds 
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds 
[email protected]:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds 
Sending 100000 numbers to Queue() took 0.980564117432 seconds 
Sending 1000000 numbers to Queue() took 10.1611330509 seconds 
[email protected]:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds 
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds 
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds 
[email protected]:~$ 

Nói tóm lại Pipe() là nhanh hơn so với một Queue() khoảng ba lần. Thậm chí không suy nghĩ về các JoinableQueue() trừ khi bạn thực sự phải có những lợi ích.

THƯỞNG TÀI LIỆU 2

Đa giới thiệu những thay đổi tinh tế trong dòng chảy thông tin mà làm cho gỡ lỗi khó trừ khi bạn biết một số phím tắt. Ví dụ, bạn có thể có một kịch bản hoạt động tốt khi lập chỉ mục thông qua một từ điển trong nhiều điều kiện, nhưng thỉnh thoảng không thành công với một số đầu vào nhất định.

Thông thường chúng tôi nhận được manh mối về sự thất bại khi toàn bộ quá trình python bị treo; tuy nhiên, bạn không nhận được các dấu vết sự cố không mong muốn được in trên bảng điều khiển nếu chức năng đa xử lý bị treo. Theo dõi xuống các lỗi treo đa xử lý không xác định là khó mà không có manh mối về những gì đã xảy ra.

Cách đơn giản nhất tôi đã tìm thấy để theo dõi đa xử vụ tai nạn informaiton là để bọc toàn bộ chức năng đa xử lý trong một try/except và sử dụng traceback.print_exc():

import traceback 
def reader(args): 
    try: 
     # Insert stuff to be multiprocessed here 
     return args[0]['that'] 
    except: 
     print "FATAL: reader({0}) exited while multiprocessing".format(args) 
     traceback.print_exc() 

Bây giờ, khi bạn tìm thấy một vụ tai nạn mà bạn nhìn thấy cái gì đó như :

FATAL: reader([{'crash', 'this'}]) exited while multiprocessing 
Traceback (most recent call last): 
    File "foo.py", line 19, in __init__ 
    self.run(task_q, result_q) 
    File "foo.py", line 46, in run 
    raise ValueError 
ValueError 

Source Code:


""" 
multi_pipe.py 
""" 
from multiprocessing import Process, Pipe 
import time 

def reader(pipe): 
    output_p, input_p = pipe 
    input_p.close() # We are only reading 
    while True: 
     try: 
      msg = output_p.recv() # Read from the output pipe and do nothing 
     except EOFError: 
      break 

def writer(count, input_p): 
    for ii in xrange(0, count): 
     input_p.send(ii)    # Write 'count' numbers into the input pipe 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     output_p, input_p = Pipe() 
     reader_p = Process(target=reader, args=((output_p, input_p),)) 
     reader_p.start()  # Launch the reader process 

     output_p.close()  # We no longer need this part of the Pipe() 
     _start = time.time() 
     writer(count, input_p) # Send a lot of stuff to reader() 
     input_p.close()  # Ask the reader to stop when it reads EOF 
     reader_p.join() 
     print "Sending %s numbers to Pipe() took %s seconds" % (count, 
      (time.time() - _start)) 

""" 
multi_queue.py 
""" 
from multiprocessing import Process, Queue 
import time 

def reader(queue): 
    while True: 
     msg = queue.get()   # Read from the queue and do nothing 
     if (msg == 'DONE'): 
      break 

def writer(count, queue): 
    for ii in xrange(0, count): 
     queue.put(ii)    # Write 'count' numbers into the queue 
    queue.put('DONE') 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     queue = Queue() # reader() reads from queue 
          # writer() writes to queue 
     reader_p = Process(target=reader, args=((queue),)) 
     reader_p.daemon = True 
     reader_p.start()  # Launch the reader process 

     _start = time.time() 
     writer(count, queue) # Send a lot of stuff to reader() 
     reader_p.join()   # Wait for the reader to finish 
     print "Sending %s numbers to Queue() took %s seconds" % (count, 
      (time.time() - _start)) 

""" 
multi_joinablequeue.py 
""" 
from multiprocessing import Process, JoinableQueue 
import time 

def reader(queue): 
    while True: 
     msg = queue.get()   # Read from the queue and do nothing 
     queue.task_done() 

def writer(count, queue): 
    for ii in xrange(0, count): 
     queue.put(ii)    # Write 'count' numbers into the queue 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     queue = JoinableQueue() # reader() reads from queue 
            # writer() writes to queue 
     reader_p = Process(target=reader, args=((queue),)) 
     reader_p.daemon = True 
     reader_p.start()  # Launch the reader process 

     _start = time.time() 
     writer(count, queue) # Send a lot of stuff to reader() 
     queue.join()   # Wait for the reader to finish 
     print "Sending %s numbers to JoinableQueue() took %s seconds" % (count, 
      (time.time() - _start)) 
+2

@ Jonathan "Trong ống tóm tắt() nhanh hơn khoảng ba lần một Queue()" –

+0

Nhưng Pipe() có thể không an toàn được sử dụng với nhiều nhà sản xuất/người tiêu dùng. –

+11

Tuyệt vời! Tốt câu trả lời và tốt đẹp mà bạn cung cấp các tiêu chuẩn! Tôi chỉ có hai câu hỏi nhỏ: (1) "đơn đặt hàng của cường độ nhanh hơn" là một chút quá mức. Sự khác biệt là x3, khoảng một phần ba của một bậc độ lớn. Chỉ cần nói. ;-); và (2) một so sánh công bằng hơn sẽ chạy N công nhân, mỗi giao tiếp với chủ đề chính thông qua điểm-to-điểm ống so với hiệu suất của chạy N công nhân tất cả kéo từ một đơn điểm-to-đa hàng đợi. – JJC