Flask 中带有 redis 后端的 Celery 任务优先级

Celery task Priority with redis backend in Flask

下面是我简化的网站应用程序和芹菜设置

app = Flask(__name__)
celery = Celery(__name__, broker='redis://localhost:6379/0') 
celery.conf.update({app.config})

其他芹菜配置是

CELERYD_MAX_TASKS_PER_CHILD=1 
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT=['json','pickle']

我有两个任务,一个占用大量 CPU 和内存资源,需要很长时间才能完成(我想将其优先级设置为 "low"),以及另一种只是发邮件验证账号(我想设置优先级为"high"),定义如下,

@celery.task(serializer='pickle') 
def long_big_task_with_low_priority(...):
    ...   

@celery.task(serializer='pickle') 
def send_email_with_high_priority(...):
    ...

我用supervisord来执行celery

celery worker -A celerytaskfile.celery --loglevel=error --concurrency=2 --logfile=celerylogfile.log

因为第一个任务比较耗资源,所以这里设置并发2。

我的问题是当有两个第一个任务时运行,现在celery收到一个新的发送邮件的任务,是否可以立即执行发送邮件任务,以及然后在它完成后继续前两个任务?

包版本:Celery:4.1.0 redis==2.10.6

你需要两个工人。一个将仅处理 long_big_task_with_low_priority,另一个将处理其余(快速 运行)任务。为此,您需要一个单独的队列来处理 long_big_task_with_low_priority 任务。

使用 CELERY_TASK_ROUTESCELERY_ROUTES 用于较旧的 celery 版本)将慢速任务路由到新队列。

CELERY_TASK_ROUTES = {
    'long_big_task_with_low_priority': {'queue': "slow_queue"},
}

当 运行 工作人员时,分配队列(对两者都很重要,否则他们将使用任何队列中的任务)

celery 是默认队列名称 - 默认情况下所有任务都去那里:

celery worker -A celerytaskfile.celery --queues celery

第二个工人:

celery worker -A celerytaskfile.celery --queues slow_queue

http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-routes https://docs.celeryproject.org/en/latest/userguide/workers.html#queues