2013-03-01 17 views
10

Theo hướng dẫn cần tây liên quan đến real-time monitoring of celery workers, người ta cũng có thể nắm bắt theo chương trình các sự kiện do công nhân sản xuất và thực hiện hành động tương ứng.Làm thế nào để theo dõi các sự kiện từ công nhân trong ứng dụng Celery-Django?

Câu hỏi của tôi là làm cách nào tôi có thể tích hợp màn hình dưới dạng màn hình trong ví dụ this, trong ứng dụng Celery-Django?

EDIT: Các mã ví dụ trong hướng dẫn trông giống như:

from celery import Celery 

def my_monitor(app): 
    state = app.events.State() 

    def announce_failed_tasks(event): 
     state.event(event) 
     task_id = event['uuid'] 

     print('TASK FAILED: %s[%s] %s' % (
      event['name'], task_id, state[task_id].info(),)) 
    with app.connection() as connection: 
     recv = app.events.Receiver(connection, handlers={ 
       'task-failed': announce_failed_tasks, 
       'worker-heartbeat': announce_dead_workers, 
     }) 
     recv.capture(limit=None, timeout=None, wakeup=True) 

if __name__ == '__main__': 
    celery = Celery(broker='amqp://[email protected]//') 
    my_monitor(celery) 

Vì vậy, tôi muốn chụp sự kiện task_failed gửi bởi người lao động, và để có được task_id của nó giống như các chương trình hướng dẫn, để có được những kết quả cho nhiệm vụ này từ backend kết quả đã được cấu hình cho ứng dụng của tôi và xử lý nó thêm nữa. Vấn đề của tôi là nó không phải là rõ ràng với tôi làm thế nào để có được các ứng dụng, như trong một dự án django-cần tây nó không phải là minh bạch cho tôi sự khởi tạo của thư viện Celery.

Tôi cũng mở cho bất kỳ ý tưởng nào khác về cách xử lý kết quả khi một công nhân đã hoàn thành việc thực hiện một tác vụ.

+0

Tôi nghĩ rằng bạn sẽ có được một chút cụ thể hơn, những sự kiện cần chụp? Bạn có mã ví dụ nào không? – danodonovan

Trả lời

14

Ok, tôi đã tìm ra cách để làm điều này, mặc dù tôi không chắc chắn đây là giải pháp, nhưng nó hoạt động cho tôi. Chức năng giám sát về cơ bản kết nối trực tiếp với người môi giới và lắng nghe các loại sự kiện khác nhau. Mã của tôi trông giống như sau:

from celery.events import EventReceiver 
from kombu import Connection as BrokerConnection 

def my_monitor: 
    connection = BrokerConnection('amqp://guest:[email protected]:5672//') 

    def on_event(event): 
     print "EVENT HAPPENED: ", event 

    def on_task_failed(event): 
     exception = event['exception'] 
     print "TASK FAILED!", event, " EXCEPTION: ", exception 

    while True: 
     try: 
      with connection as conn: 
       recv = EventReceiver(conn, 
           handlers={'task-failed' : on_task_failed, 
              'task-succeeded' : on_event, 
              'task-sent' : on_event, 
              'task-received' : on_event, 
              'task-revoked' : on_event, 
              'task-started' : on_event, 
              # OR: '*' : on_event 
              }) 
      recv.capture(limit=None, timeout=None) 
    except (KeyboardInterrupt, SystemExit): 
     print "EXCEPTION KEYBOARD INTERRUPT" 
     sys.exit() 

Đây là tất cả. Và tôi chạy điều này trong một quy trình khác với ứng dụng bình thường, có nghĩa là tôi tạo ra một tiến trình con của ứng dụng cần tây của tôi chỉ chạy chức năng này. HTH

+0

Xin chào, cảm ơn, câu hỏi của bạn về cơ bản là những gì tôi đang cố gắng làm bây giờ. Bạn đang đặt mã này trong dự án Django của bạn ở đâu? Bạn có thể giải thích về việc tạo ra quá trình con của ứng dụng cần tây của bạn? Hiện tại, ứng dụng cần tây của tôi được định cấu hình trong 'myproj/myproj/celery.py' (theo http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery -with-django) – fpghost

+1

Xin chào! Tôi đã không làm việc này trong một thời gian dài, vì vậy mọi thứ có thể đã thay đổi trong bản thân Celery trong các bản phát hành cuối cùng. Về cơ bản tôi đã bắt đầu một quá trình daemon Python như: daemon_process = Process (target = results_processing.my_monitor) daemon_process.daemon = True daemon_process.start() trong một trong các mô-đun được gọi khi ứng dụng bắt đầu – Clara

+0

Tôi đã sử dụng Django và khởi xướng màn hình này qua đó. Trong Django <1.9 tôi đã sử dụng để có thể bắt đầu theo dõi trong tập tin 'proj/proj/celery.py', chỉ bằng' my_monitor (app) 'sau khi xác định ứng dụng cần tây. Bây giờ trong Django 1.9 mà kết quả trong một 'AppRegistryNotReady' exc (Tôi nghĩ rằng các mô hình nhập khẩu trong' __init __. Py' của ứng dụng hiện nay không được phép --- Tôi nên lưu ý rằng màn hình của tôi dựa trên một số mô hình). Tôi đã bắt đầu khởi động màn hình trong phương thức 'AppConfig.ready()' của ứng dụng django có các mô hình mà màn hình của tôi dựa vào (điều này đảm bảo rằng ứng dụng đã hoàn tất đăng ký). HTH – fpghost

4

Cẩn thận với một vài gotchas

  1. Bạn cần phải thiết lập CELERY_SEND_EVENTS cờ như đúng trong cấu hình cần tây bạn.
  2. Bạn cũng có thể đặt trình giám sát sự kiện trong một chủ đề mới từ nhân viên của mình.

Đây là triển khai thực hiện của tôi:

class MonitorThread(object): 
    def __init__(self, celery_app, interval=1): 
     self.celery_app = celery_app 
     self.interval = interval 

     self.state = self.celery_app.events.State() 

     self.thread = threading.Thread(target=self.run, args=()) 
     self.thread.daemon = True 
     self.thread.start() 

    def catchall(self, event): 
     if event['type'] != 'worker-heartbeat': 
      self.state.event(event) 

     # logic here 

    def run(self): 
     while True: 
      try: 
       with self.celery_app.connection() as connection: 
        recv = self.celery_app.events.Receiver(connection, handlers={ 
         '*': self.catchall 
        }) 
        recv.capture(limit=None, timeout=None, wakeup=True) 

      except (KeyboardInterrupt, SystemExit): 
       raise 

      except Exception: 
       # unable to capture 
       pass 

      time.sleep(self.interval) 

if __name__ == '__main__': 
    app = get_celery_app() # returns app 
    MonitorThread(app) 
    app.start()