2012-08-27 27 views
5

Một số ngữ cảnh: Tôi đang xây dựng một Ứng dụng Django cho phép người dùng lưu trước một hành động và lên lịch chính xác/thời gian trong tương lai họ muốn nói hành động để thực hiện. Ví dụ: lên lịch cho một bài đăng được đẩy theo chương trình vào tường Facebook vào tuần tới lúc 5:30 sáng.Lập kế hoạch hàng ngàn nhiệm vụ một lần (không tái) để thực hiện gần như đồng thời qua Django-celery

Tôi đang tìm một hệ thống lập kế hoạch nhiệm vụ có thể xử lý một nghìn phiên bản của tác vụ một lần, tất cả được đặt để thực thi gần như đồng thời (lề lỗi cộng hoặc trừ một phút).

Tôi đang xem xét Django-cần tây/Rabbitmq cho việc này, nhưng tôi nhận thấy các Celery docs không giải quyết các nhiệm vụ có nghĩa là để sử dụng một lần. Django-cần tây là sự lựa chọn đúng ở đây (có lẽ bằng cách phân lớp CrontabSchedule) hay năng lượng của tôi tốt hơn dành cho nghiên cứu một số cách tiếp cận khác? Có lẽ hack cùng một cái gì đó với Sched Module và Cron.

+0

Tôi không biết liệu bạn có được thông báo về bản chỉnh sửa mới nhất của tôi hay không, vì vậy tôi chỉ muốn nhận xét ở đây và chắc chắn. – dokkaebi

Trả lời

7

Chỉnh sửa 2:

Đối với một số lý do, người đứng đầu của tôi ban đầu bị mắc kẹt trong lĩnh vực công việc định kỳ. Đây là một giải pháp đơn giản hơn.

Tất cả những gì bạn thực sự cần là xác định một nhiệm vụ cho mỗi hành động của người dùng. Bạn có thể bỏ qua việc lưu trữ các tác vụ được thực hiện trong cơ sở dữ liệu của bạn - đó là những gì cần tây ở đây!

Hãy sử dụng lại ví dụ trên facebook của bạn, và một lần nữa giả sử bạn có chức năng post_to_facebook ở đâu đó, một người dùng và một số văn bản, thực hiện một số phép thuật và đăng văn bản lên facebook của người dùng đó, bạn có thể xác định nó là nhiệm vụ như thế này:

# Task to send one update. 
@celery.task(ignore_result=True) 
def post_to_facebook(user, text): 
    # perform magic 
    return whatever_you_want 

khi người dùng đã sẵn sàng để enqueue một bài như vậy, bạn chỉ cần nói với cần tây khi để chạy các nhiệm vụ:

post_to_facebook.apply_async(
    (user, text), # args 
    eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440) # pass execution options as kwargs 
) 

đây là tất cả các chi tiết ở đây, trong một bó toàn bộ các tùy chọn cuộc gọi khả dụng: http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown

Nếu bạn cần kết quả của cuộc gọi, bạn có thể bỏ qua param ignore_result trong định nghĩa nhiệm vụ và nhận lại đối tượng AsyncResult, sau đó kiểm tra nó để biết kết quả cuộc gọi. Thêm tại đây: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

Một số câu trả lời dưới đây vẫn có liên quan. Bạn vẫn muốn có một nhiệm vụ cho mỗi hành động của người dùng, bạn vẫn muốn suy nghĩ về thiết kế nhiệm vụ, vv, nhưng đây là một con đường đơn giản hơn nhiều để làm những gì bạn hỏi.

Original câu trả lời bằng việc định kỳ sau:

Dannyroa có ý tưởng đúng. Tôi sẽ xây dựng trên đó một chút ở đây.

Edit/TLDR: Câu trả lời là , cần tây là phù hợp với nhu cầu của bạn. Bạn chỉ có thể cần phải suy nghĩ lại định nghĩa nhiệm vụ của bạn.

Tôi cho rằng bạn không cho phép người dùng viết mã Python tùy ý để xác định nhiệm vụ của họ. Tóm lại, bạn sẽ phải xác định trước một số hành động mà người dùng có thể lên lịch và sau đó cho phép họ lên lịch cho những hành động đó theo ý muốn. Sau đó, bạn chỉ có thể chạy một tác vụ được lập biểu cho mỗi hành động của người dùng, kiểm tra các mục nhập và thực hiện hành động cho mỗi mục nhập.

Một người dùng hành động:

Sử dụng ví dụ Facebook của bạn, bạn sẽ lưu trữ thông tin cập nhật của người sử dụng trong một bảng:

class ScheduledPost(Model): 
    user = ForeignKey('auth.User') 
    text = TextField() 
    time = DateTimeField() 
    sent = BooleanField(default=False) 

Sau đó, bạn sẽ chạy một nhiệm vụ từng phút, kiểm tra các mục trong đó bảng được lên lịch để được đăng vào phút cuối (dựa trên lề lỗi bạn đã đề cập). Nếu điều quan trọng là bạn nhấn cửa sổ một phút, bạn có thể lên lịch công việc thường xuyên hơn, cứ 30 giây một lần. Các thể nhiệm vụ giống như thế này (trong myapp/tasks.py):

@celery.task 
def post_scheduled_updates(): 
    from celery import current_task 
    scheduled_posts = ScheduledPost.objects.filter(
     sent=False, 
     time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this 
     time__lte=timezone.now() 
    ) 
    for post in scheduled_posts: 
     if post_to_facebook(post.text): 
      post.sent = True 
      post.save() 

Các cấu hình có thể trông như thế này:

CELERYBEAT_SCHEDULE = { 
    'fb-every-30-seconds': { 
     'task': 'tasks.post_scheduled_updates', 
     'schedule': timedelta(seconds=30), 
    }, 
} 

bổ sung sử dụng hành động:

Đối với mỗi hành động người dùng ngoài việc đăng lên Facebook, bạn có thể xác định bảng mới và tác vụ mới:

class EmailToMom(Model): 
    user = ForeignKey('auth.User') 
    text = TextField() 
    subject = CharField(max_length=255) 
    sent = BooleanField(default=False) 
    time = DateTimeField() 

@celery.task 
def send_emails_to_mom(): 
    scheduled_emails = EmailToMom.objects.filter(
     sent=False, 
     time__lt=timezone.now() 
    ) 
    for email in scheduled_emails: 
     sent = send_mail(
      email.subject, 
      email.text, 
      email.user.email, 
      [email.user.mom.email], 
     ) 
     if sent: 
      email.sent = True 
      email.save() 

    CELERYBEAT_SCHEDULE = { 
     'fb-every-30-seconds': { 
      'task': 'tasks.post_scheduled_updates', 
      'schedule': timedelta(seconds=30), 
     }, 
     'mom-every-30-seconds': { 
      'task': 'tasks.send_emails_to_mom', 
      'schedule': timedelta(seconds=30), 
     }, 
    } 

Tốc độ và tối ưu hóa:

Để có được thông hơn, thay vì lặp lại qua các bản cập nhật để gửi và gửi chúng serially trong khi gọi post_scheduled_updates, bạn có thể đẻ trứng lên một loạt các nhiệm vụ nhỏ và làm cho họ song song (cho đủ workers). Sau đó, các cuộc gọi đến post_scheduled_updates chạy rất nhanh chóng và lịch trình một bó toàn bộ nhiệm vụ - một cho mỗi cập nhật fb - để chạy càng sớm càng tốt. Điều đó sẽ trông giống như sau:

# Task to send one update. This will be called by post_scheduled_updates. 
@celery.task 
def post_one_update(update_id): 
    try: 
     update = ScheduledPost.objects.get(id=update_id) 
    except ScheduledPost.DoesNotExist: 
     raise 
    else: 
     sent = post_to_facebook(update.text) 
     if sent: 
      update.sent = True 
      update.save() 
     return sent 

@celery.task 
def post_scheduled_updates(): 
    from celery import current_task 
    scheduled_posts = ScheduledPost.objects.filter(
     sent=False, 
     time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this 
     time__lte=timezone.now() 
    ) 
    for post in scheduled_posts: 
     post_one_update.delay(post.id) 

Mã tôi đã đăng không được kiểm tra và chắc chắn không được tối ưu hóa, nhưng nó sẽ giúp bạn đi đúng hướng. Trong câu hỏi của bạn, bạn ngụ ý một số lo ngại về thông lượng, vì vậy bạn sẽ muốn xem xét kỹ các địa điểm để tối ưu hóa. Một rõ ràng là cập nhật hàng loạt thay vì lặp lại gọi post.sent=True;post.save().

Thông tin thêm:

Thông tin thêm về nhiệm vụ tuần hoàn: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html.

Một phần về chiến lược thiết kế nhiệm vụ: http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies

Có một trang toàn bộ về tối ưu hóa cần tây ở đây: http://docs.celeryproject.org/en/latest/userguide/optimizing.html.

Trang này về các công việc phụ cũng có thể thú vị: http://docs.celeryproject.org/en/latest/userguide/canvas.html.

Thực tế, tôi khuyên bạn nên đọc tất cả tài liệu cần tây.

+0

Tôi không thể cảm ơn đủ, câu trả lời của bạn vô cùng hữu ích đối với tôi. –

+0

Tuyệt vời. Mừng vì tôi có thể giúp! – dokkaebi

+0

Một số nhiệm vụ của bạn thất bại trong quy tắc không cần thiết quan trọng của nhiệm vụ cần tây. Ví dụ: '' ' @ celery.task def send_emails_to_mom(): scheduled_emails = EmailToMom.objects.filter ( gửi = False, time__lt = timezone.now() ) cho email trong scheduled_emails: ' '' Nếu tác vụ này được bắt đầu hai lần cùng một lúc, cả hai sẽ nhận được cùng một danh sách các email được lên lịch để gửi và sau đó bắt đầu gửi đến cùng một danh sách. – dalore

0

Điều tôi sẽ làm là tạo mô hình có tên ScheduledPost.

Tôi sẽ có một PeriodicTask chạy sau mỗi 5 phút.

Nhiệm vụ sẽ kiểm tra bảng ScheduledPost cho bất kỳ bài đăng nào cần được đẩy lên Facebook.