2012-11-07 12 views
16

Tôi muốn tạo một nhóm từ danh sách được trả về bởi tác vụ Celery, để cho mỗi mục trong tập kết quả nhiệm vụ, một nhiệm vụ sẽ được thêm vào nhóm.Làm thế nào để chuỗi một nhiệm vụ Celery trả về một danh sách thành một nhóm?

Dưới đây là một ví dụ mã đơn giản để giải thích trường hợp sử dụng. ??? phải là kết quả từ nhiệm vụ trước đó.

@celery.task 
def get_list(amount): 
    # In reality, fetch a list of items from a db 
    return [i for i in range(amount)] 

@celery.task 
def process_item(item): 
    #do stuff 
    pass 

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???)) 

Tôi có lẽ không tiếp cận này một cách chính xác, nhưng tôi khá chắc chắn nó không phải là an toàn để gọi nhiệm vụ từ bên trong nhiệm vụ:

@celery.task 
def process_list(): 
    for i in get_list.delay().get(): 
     process_item.delay(i) 

Tôi không cần kết quả từ nhiệm vụ giây .

+0

Thật vậy, hãy * không * gọi nhiệm vụ từ công việc. Điều này sẽ gây ra deadlocks. Giả sử bạn có một công nhân. Bạn gọi nhiệm vụ của bạn, mà ràng buộc công nhân 1, sau đó gọi một nhiệm vụ thứ hai. Không có nhân viên nào để xử lý nhiệm vụ đó và mọi thứ sẽ bị treo. Nastiness này được tốt hơn một chút khi bạn thêm công nhân, nhưng bạn sẽ luôn luôn được buộc lên nhiều công nhân với một nhiệm vụ duy nhất (và mất song song). – mlissner

Trả lời

29

Bạn có thể nhận loại hành vi này bằng cách sử dụng tác vụ trung gian. Dưới đây là một minh chứng về việc tạo một "bản đồ" giống như phương pháp hoạt động như bạn đã đề xuất.

from celery import task, subtask, group 

@task 
def get_list(amount): 
    return [i for i in range(amount)] 

@task 
def process_item(item): 
    # do stuff 
    pass 

@task 
def dmap(it, callback): 
    # Map a callback over an iterator and return as a group 
    callback = subtask(callback) 
    return group(callback.clone([arg,]) for arg in it)() 

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s())) 

Tín dụng để hỏi Cung cấp cho tôi đề nghị này khi tôi yêu cầu trợ giúp về vấn đề tương tự.

+1

Lưu ý rằng bản sao chỉ làm một bản sao nông. Nếu bạn muốn sao chép một chữ ký "phức tạp" (như một chuỗi, nhóm hoặc hợp âm), bạn sẽ cần phải (ab) sử dụng sâu của python, như đã đề cập trong [vấn đề cần tây 2251] (https://github.com/celery/cần tây/vấn đề/2251). Hoặc bạn di chuyển 'callback = subtask (gọi lại)' vào vòng lặp for để tạo ra các hàm và xóa 'clone'. –

+0

Tôi đã đọc nhận xét trên khoảng một chục lần và tôi không hiểu. Bạn có thể cung cấp một ví dụ, @LuisNell? – mlissner

+0

@mlissner Với mã trên, ý tôi là như sau. Nếu chúng ta giả định "gọi lại" không chỉ đơn giản là một tác vụ duy nhất, mà là một luồng công việc phức tạp (một nhóm hoặc một hợp âm), bạn không thể đơn giản sử dụng '.clone()'. Các nhóm và hợp âm có thể rất phức tạp (một nhóm các nhóm, v.v.). Trong trường hợp đó bạn không thể đơn giản sử dụng '.clone', bởi vì nó chỉ tạo ra một bản sao nông của chữ ký gọi lại của bạn. Điều này có nghĩa là các đối số sẽ không được truyền chính xác. Để đảm bảo mọi thứ hoạt động như mong đợi, bạn cần phải sử dụng 'bản in sâu ', như đã đề cập trong nhận xét ban đầu của tôi - điều đó có làm cho nó rõ ràng hơn không? nếu không, tôi sẽ thử lại. –