2013-09-05 66 views
6

Tôi hiện đang sử dụng django với cần tây và mọi thứ hoạt động tốt.Django Celery nhận nhiệm vụ

Tuy nhiên tôi muốn có thể cho người dùng cơ hội hủy tác vụ nếu máy chủ bị quá tải bằng cách kiểm tra số lượng tác vụ hiện được lên lịch.

Tôi làm cách nào để đạt được điều này?

Tôi đang sử dụng redis làm môi giới.

Tôi chỉ thấy câu này: Retrieve list of tasks in a queue in Celery

Nó được bằng cách nào đó liên quan đến vấn đề của tôi nhưng tôi không cần phải liệt kê các nhiệm vụ, chỉ cần đếm số họ :)

Trả lời

8

Nếu nhà môi giới của bạn được cấu hình như redis://localhost:6379/1, và nhiệm vụ của bạn được nộp cho tổng celery hàng đợi, sau đó bạn có thể lấy chiều dài bằng các phương tiện sau:

import redis 
queue_name = "celery" 
client = redis.Redis(host="localhost", port=6379, db=1) 
length = client.llen(queue_name) 

Hoặc, từ một kịch bản shell (tốt cho màn hình và ví dụ):

$ redis-cli -n 1 -h localhost -p 6379 llen celery 
+0

Mặc dù đây là một giải pháp chính xác cho các nhà môi giới redis, hãy đánh dấu @stephen bình luận Fuhry như giải pháp đúng vì nó là môi giới bất khả tri . –

4

Nếu bạn đã cấu hình redis trong ứng dụng của bạn, bạn có thể thử này:

from celery import Celery 

QUEUE_NAME = 'celery' 

celery = Celery(app) 
client = celery.connection().channel().client 

length = client.llen(QUEUE_NAME) 
+0

Đối với redis, 'client = app.broker_connection(). Channel(). Client' –

7

Đây là cách bạn có thể nhận được số lượng tin nhắn trong một hàng đợi sử dụng cần tây đó là broker- bất khả tri.

Bằng cách sử dụng connection_or_acquire, bạn có thể giảm thiểu số lượng kết nối mở cho nhà môi giới của mình bằng cách sử dụng kết nối nội bộ của cần tây.

celery = Celery(app) 

with celery.connection_or_acquire() as conn: 
    conn.default_channel.queue_declare(
     queue='my-queue', passive=True).message_count 

Bạn cũng có thể mở rộng cần tây để cung cấp chức năng này:

from celery import Celery as _Celery 


class Celery(_Celery) 

    def get_message_count(self, queue): 
     ''' 
     Raises: amqp.exceptions.NotFound: if queue does not exist 
     ''' 
     with self.connection_or_acquire() as conn: 
      return conn.default_channel.queue_declare(
       queue=queue, passive=True).message_count 


celery = Celery(app) 
num_messages = celery.get_message_count('my-queue') 
+2

Vui lòng cung cấp một số loại giải thích để hỗ trợ câu trả lời của bạn. – Lal

+0

@Lal Đã thêm một số giải thích về cách tiếp cận - hy vọng điều đó sẽ hữu ích! –

+1

amqp.exceptions.NotFound: Queue.declare: (404) NOT_FOUND - không có hàng đợi 'mặc định' trong vhost '/' Vì hàng đợi của tôi không nằm trên máy chủ '/' trên máy chủ '/ táo'. Làm thế nào để tôi đến được máy chủ đó? – Simanas