Tôi đang cố gắng tìm trọng lượng tối đa khoảng 6,1 tỷ (mục tùy chỉnh) và tôi muốn thực hiện việc này với xử lý song song. Đối với ứng dụng cụ thể của tôi có những thuật toán tốt hơn mà không yêu cầu của tôi iterating hơn 6.1 tỷ mục, nhưng sách giáo khoa giải thích họ là trên đầu của tôi và ông chủ của tôi muốn điều này được thực hiện trong 4 ngày. Tôi nghĩ rằng tôi có một shot tốt hơn với máy chủ ưa thích của công ty tôi và xử lý song song. Tuy nhiên, mọi thứ tôi biết về xử lý song song đều đến từ việc đọc số Pythondocumentation. Mà là để nói rằng tôi khá bị mất ...Tránh điều kiện cuộc đua trong Hàng đợi đa xử lý của Python 3
Lý thuyết hiện tại của tôi là thiết lập quy trình nạp, hàng đợi đầu vào, toàn bộ bó (ví dụ, 30) quy trình công nhân và hàng đợi đầu ra (tìm phần tử tối đa trong hàng đợi đầu ra sẽ không quan trọng). Những gì tôi không hiểu là làm thế nào quá trình feeder có thể nói với các quy trình công nhân khi dừng đợi các vật phẩm đi qua hàng đợi đầu vào.
Tôi đã nghĩ về việc sử dụng multiprocessing.Pool.map_async
trên các mục 6.1E9 có thể lặp lại của tôi, nhưng phải mất gần 10 phút để lặp qua các mục mà không làm bất cứ điều gì với chúng. Trừ khi tôi hiểu nhầm điều gì đó ..., có map_async
lặp qua chúng để gán chúng cho các quy trình có thể được thực hiện trong khi các quy trình bắt đầu công việc của chúng. (Pool
cũng cung cấp imap
nhưng documentation nói nó tương tự như map
, mà không xuất hiện để làm việc không đồng bộ Tôi muốn đồng bộ, đúng.?)
câu hỏi liên quan: Tôi muốn sử dụng concurrent.futures
thay vì multiprocessing
? Tôi không thể là người đầu tiên thực hiện một hệ thống hai hàng đợi (đó chính xác là cách mà các hãng hàng không ở Mỹ làm việc ...) vậy thì có cách nào khác để tạo ra điều này cho Pythonic/built-in?
Đây là bộ xương của những gì tôi đang cố gắng làm. Xem khối nhận xét ở giữa.
import multiprocessing as mp
import queue
def faucet(items, bathtub):
"""Fill bathtub, a process-safe queue, with 6.1e9 items"""
for item in items:
bathtub.put(item)
bathtub.close()
def drain_filter(bathtub, drain):
"""Put maximal item from bathtub into drain.
Bathtub and drain are process-safe queues.
"""
max_weight = 0
max_item = None
while True:
try:
current_item = bathtub.get()
# The following line three lines are the ones that I can't
# quite figure out how to trigger without a race condition.
# What I would love is to trigger them AFTER faucet calls
# bathtub.close and the bathtub queue is empty.
except queue.Empty:
drain.put((max_weight, max_item))
return
else:
bathtub.task_done()
if not item.is_relevant():
continue
current_weight = item.weight
if current_weight > max_weight:
max_weight = current_weight
max_item = current_item
def parallel_max(items, nprocs=30):
"""The elements of items should have a method `is_relevant`
and an attribute `weight`. `items` itself is an immutable
iterator object.
"""
bathtub_q = mp.JoinableQueue()
drain_q = mp.Queue()
faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
worker_procs = mp.Pool(processes=nprocs)
faucet_proc.start()
worker_procs.apply_async(drain_filter, bathtub_q, drain_q)
finalists = []
for i in range(nprocs):
finalists.append(drain_q.get())
return max(finalists)
Đây là Trả lời
Tôi tìm thấy một câu trả lời rất kỹ lưỡng để câu hỏi của tôi, và một giới thiệu nhẹ nhàng để đa nhiệm của đạo diễn truyền Python Foundation Doug Hellman. Những gì tôi muốn là mẫu "thuốc độc". Hãy khám phá tại đây: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html
Đạo cụ để @MRAB đăng hạt nhân của khái niệm đó.
Tại sao bạn 'nhập hàng đợi' nếu bạn đang sử dụng' multiprocessing.Queue'? –
Tôi sử dụng nó để bắt một ngoại lệ 'queue.Empty' khi nhân viên nhìn vào hàng đợi đầu vào của anh ta. Suy nghĩ mơ hồ của tôi là ngoại lệ đó có thể bị ném nếu và chỉ khi hàng đợi đã bị đóng và nó cũng trống rỗng. Lưu ý rằng theo phương pháp @ MRAB trong câu trả lời của mình, việc nhập 'hàng đợi' sẽ không cần thiết. – wkschwartz