2012-02-09 24 views
6

Tôi đã cố gắng song song một số mã bằng cách sử dụng concurrent.futures.ProcessPoolExecutor nhưng đã giữ các lỗi chết lạ không xảy ra với ThreadPoolExecutor. Một ví dụ nhỏ:Bế tắc trong mã đồng thời.futures

from concurrent import futures 

def test(): 
    pass 

with futures.ProcessPoolExecutor(4) as executor: 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     executor.submit(test) 

Trong python 3.2.2 (trên 64-bit Ubuntu), điều này dường như để treo một cách nhất quán sau khi nộp tất cả các công việc - và điều này dường như xảy ra bất cứ khi nào số lượng việc làm nộp lớn hơn số lượng công nhân. Nếu tôi thay thế ProcessPoolExecutor bằng ThreadPoolExecutor nó hoạt động hoàn hảo.

Như một nỗ lực để điều tra, tôi đã từng tương lai một callback để in giá trị của i:

from concurrent import futures 

def test(): 
    pass 

with futures.ProcessPoolExecutor(4) as executor: 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     future = executor.submit(test) 

     def callback(f): 
      print('callback {}'.format(i)) 
     future.add_done_callback(callback) 

này chỉ nhầm lẫn tôi thậm chí nhiều hơn - giá trị của i in ra bởi callback là giá trị tại thời gian nó được gọi, thay vì tại thời điểm nó được xác định (vì vậy tôi không bao giờ thấy callback 0 nhưng tôi nhận được rất nhiều callback 99 s). Một lần nữa, ThreadPoolExecutor in ra giá trị mong đợi.

Tự hỏi nếu điều này có thể là một lỗi, tôi đã thử một phiên bản phát triển gần đây của python. Bây giờ, các mã ít nhất dường như chấm dứt, nhưng tôi vẫn nhận được giá trị sai của i in ra.

Vì vậy, bất cứ ai có thể giải thích:

  • những gì đã xảy ra với ProcessPoolExecutor ở giữa python 3.2 và các phiên bản dev hiện rằng dường như cố định bế tắc này

  • lý do tại sao giá trị 'sai' của i đang được in

EDIT: như jukiewicz chỉ ra dưới đây, tất nhiên in i sẽ in giá trị tại thời điểm gọi lại được gọi là, tôi không biết những gì tôi đã suy nghĩ ... nếu tôi vượt qua một đối tượng có thể gọi với giá trị i là một trong các thuộc tính của nó, hoạt động như mong đợi.

EDIT: thêm một chút thông tin: tất cả các cuộc gọi lại được thực thi, vì vậy có vẻ như là executor.shutdown (được gọi là executor.__exit__) không thể cho biết rằng các quy trình đã hoàn tất. Điều này dường như hoàn toàn cố định trong python 3.3 hiện tại, nhưng dường như đã có rất nhiều thay đổi đối với multiprocessingconcurrent.futures, vì vậy tôi không biết điều gì đã khắc phục điều này. Vì tôi không thể sử dụng 3.3 (nó dường như không tương thích với phiên bản phát hành hoặc phiên bản dev), tôi đã cố gắng sao chép các gói đa xử lý và đồng thời của mình sang cài đặt 3.2, điều này có vẻ ổn. Tuy nhiên, có vẻ hơi kỳ lạ - theo như tôi thấy - ProcessPoolExecutor hoàn toàn bị hỏng trong phiên bản phát hành mới nhất nhưng không ai khác bị ảnh hưởng.

+1

Đối với điều thứ hai, các quy trình in '99' là tự nhiên. biểu tượng 'i' bị ràng buộc bởi bối cảnh toàn cầu, và tạo ra các quy trình mới là tốn kém, do đó, vào thời điểm bạn nhận được để thực hiện bất cứ điều gì,' i == 99'. – julkiewicz

+1

Ngoài ra, tôi có Ubuntu 64 bit, Python 3.2.2 và đoạn mã đầu tiên không treo ... – julkiewicz

+0

@julkiewicz: điều đó rất lạ. Tôi đã thử nó trên một máy khác chạy 64-bit Scientific Linux và python 3.2.2, và nó bị trì hoãn sau khi in 'submit 99' trên 10 lần thử trong số 10. Tôi thậm chí đã thử gói mã trong 'if __name__ == ' __main __ ''như tôi đã nghe là cần thiết cho đa xử lý trên Windows. – James

Trả lời

2

Tôi đã sửa đổi mã như sau, giải quyết cả hai vấn đề. Chức năng callback được định nghĩa là đóng cửa, do đó sẽ sử dụng giá trị cập nhật i mỗi lần. Đối với bế tắc, đó có thể là một nguyên nhân của việc tắt các Executor trước khi tất cả các nhiệm vụ được hoàn thành. Chờ đợi cho tương lai để hoàn thành giải quyết đó, quá.

from concurrent import futures 

def test(i): 
    return i 

def callback(f): 
    print('callback {}'.format(f.result())) 


with futures.ProcessPoolExecutor(4) as executor: 
    fs = [] 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     future = executor.submit(test, i) 
     future.add_done_callback(callback) 
     fs.append(future) 

    for _ in futures.as_completed(fs): pass 

UPDATE: oh, xin lỗi, tôi chưa đọc nội dung cập nhật của bạn, dường như điều này đã được giải quyết.