2013-03-15 10 views
7
import celery 
def temptask(n): 
    header=list(tempsubtask.si(i) for i in range(n)) 
    callback=templink.si('printed at last?') 
    r = celery.chord(celery.group(header))(callback) 
    return r 

@task() 
def tempsubtask(i): 
    print i  
    for x in range(i): 
     time.sleep(2) 
     current_task.update_state(
      state='PROGRESS', meta={'completed': x, 'total': i }) 

@task() 
def templink(x): 
    print 'this should be run at last %s'%x 

#executing temptask 
r = temptask(100) 

Tôi muốn thừa nhận trạng thái tiến trình được cập nhật bởi tempsubtask. Làm thế nào tôi có thể đi về việc đạt được nó?Làm thế nào để theo dõi tiến độ của các tác vụ riêng lẻ bên trong một nhóm tạo thành tiêu đề cho một hợp âm trong cần tây?

Trả lời

4

Sau nhiều giờ googling tôi tình cờ gặp www.manasupo.com/2012/03/chord-progress-in-celery.html. Mặc dù giải pháp không có tác dụng với tôi, nhưng nó đã truyền cảm hứng cho tôi để thử một cái gì đó tương tự.

from celery.utils import uuid 
from celery import chord 

class ProgressChord(chord): 

    def __call__(self, body=None, **kwargs): 
     _chord = self.Chord 
     body = (body or self.kwargs['body']).clone() 
     kwargs = dict(self.kwargs, body=body, **kwargs) 
     if _chord.app.conf.CELERY_ALWAYS_EAGER: 
      return self.apply((), kwargs) 
     callback_id = body.options.setdefault('task_id', uuid()) 
     r= _chord(**kwargs) 
     return _chord.AsyncResult(callback_id), r 

và thay vì thực hiện celery.chord tôi sử dụng ProgressChord như sau:

def temptask(n): 
    header=list(tempsubtask.si(i) for i in range(n)) 
    callback=templink.si('printed at last?') 
    r = celery.Progresschord(celery.group(header))(callback) 
    return r 

giá trị trả về của r chứa một tuple có cả hai, asyncResult gọi lại và kết quả nhóm. Vì vậy, thành công nhìn một cái gì đó như thế này:

In [3]: r 
Out[3]: 
(<AsyncResult: bf87507c-14cb-4ac4-8070-d32e4ff326a6>, 
<GroupResult: af69e131-5a93-492d-b985-267484651d95 [4672cbbb-8ec3-4a9e-971a-275807124fae, a236e55f-b312-485c-a816-499d39d7de41, e825a072-b23c-43f2-b920-350413fd5c9e, e3f8378d-fd02-4a34-934b-39a5a735871d, c4f7093b-9f1a-4e5e-b90d-66f83b9c97c4, d5c7dc2c-4e10-4e71-ba2b-055a33e15f02, 07b1c6f7-fe95-4c1f-b0ba-6bc82bceaa4e, 00966cb8-41c2-4e95-b5e7-d8604c000927, e039c78e-6647-4c8d-b59b-e9baf73171a0, 6cfdef0a-25a2-4905-a40e-fea9c7940044]>) 

Tôi thừa hưởng và overrode [celery.chord][1] thay vì [celery.task.chords.Chord][2] bởi vì tôi không thể tìm thấy nó là nguồn mọi nơi.