Sau nhiều giờ googling tôi tình cờ gặp www.manasupo.com/2012/03/chord-progress-in-celery.html. Mặc dù giải pháp không có tác dụng với tôi, nhưng nó đã truyền cảm hứng cho tôi để thử một cái gì đó tương tự.
from celery.utils import uuid
from celery import chord
class ProgressChord(chord):
def __call__(self, body=None, **kwargs):
_chord = self.Chord
body = (body or self.kwargs['body']).clone()
kwargs = dict(self.kwargs, body=body, **kwargs)
if _chord.app.conf.CELERY_ALWAYS_EAGER:
return self.apply((), kwargs)
callback_id = body.options.setdefault('task_id', uuid())
r= _chord(**kwargs)
return _chord.AsyncResult(callback_id), r
và thay vì thực hiện celery.chord tôi sử dụng ProgressChord như sau:
def temptask(n):
header=list(tempsubtask.si(i) for i in range(n))
callback=templink.si('printed at last?')
r = celery.Progresschord(celery.group(header))(callback)
return r
giá trị trả về của r chứa một tuple có cả hai, asyncResult gọi lại và kết quả nhóm. Vì vậy, thành công nhìn một cái gì đó như thế này:
In [3]: r
Out[3]:
(<AsyncResult: bf87507c-14cb-4ac4-8070-d32e4ff326a6>,
<GroupResult: af69e131-5a93-492d-b985-267484651d95 [4672cbbb-8ec3-4a9e-971a-275807124fae, a236e55f-b312-485c-a816-499d39d7de41, e825a072-b23c-43f2-b920-350413fd5c9e, e3f8378d-fd02-4a34-934b-39a5a735871d, c4f7093b-9f1a-4e5e-b90d-66f83b9c97c4, d5c7dc2c-4e10-4e71-ba2b-055a33e15f02, 07b1c6f7-fe95-4c1f-b0ba-6bc82bceaa4e, 00966cb8-41c2-4e95-b5e7-d8604c000927, e039c78e-6647-4c8d-b59b-e9baf73171a0, 6cfdef0a-25a2-4905-a40e-fea9c7940044]>)
Tôi thừa hưởng và overrode [celery.chord][1]
thay vì [celery.task.chords.Chord][2]
bởi vì tôi không thể tìm thấy nó là nguồn mọi nơi.
Nguồn
2013-03-16 06:23:10