Python 将任务路由到特定 RabbitMQ 队列时 Celery 的奇怪行为
Python Celery weird behavior when routing tasks to specific RabbitMQ queues
我为此苦苦思索了一段时间,但仍然无法弄清楚问题出在哪里。我正在尝试将不同的任务路由到他们自己的特定队列。在 RabbitMQ 中,我有默认的 celery
队列和另一个名为 test_queue
的队列。我可以将任务推送到 celery
就好了,但是我无法弄清楚如何将任务推送到 test_queue
队列。我已经阅读了芹菜设置 task_default_queue
、task_queue
和 task_routes
.
的文档
celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('celery_test',
broker_url='amqp://',
backend='amqp://',
include=['celery_test.tasks'],
worker_max_tasks_per_child=1,
task_create_missing_queues=True,
#task_queues = {'test_queue': {'exchange': 'test_queue', 'routing_key': 'test_queue'}},
#task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
#task_default_queue='test_queue'
)
# Optional configuration, see the application user guide.
app.conf.update(result_expires=3600,)
app.conf.task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
if __name__ == '__main__':
app.start()
我注意到当我在 Celery
构造函数中指定 test_queue
队列名称时,我的任务没有正确路由到正确的 RabbitMQ 队列。相反,我不得不使用 app.conf.task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
来完成工作。我知道这看起来微不足道,但我已经为此苦苦思索了几个小时,我无法弄清楚为什么一种方法有效而另一种方法无效。
这是我的 celeryconfig.py
broker_url="amqp://"
result_backend="amqp://"
include=["tasks"]
task_acks_late=True
task_default_rate_limit="150/m"
task_time_limit=300
worker_prefetch_multiplier=1
worker_max_tasks_per_child=2
task_routes={'tasks.test': {'queue': 'test_queue'}}
我正在使用
- 4.3.0(大黄)
- Python 3.5.2
据我所知,Celery 4.x (class) 构造函数不采用您提到的那些参数(task_queues、task_routes 和 task_default_queue).
我为此苦苦思索了一段时间,但仍然无法弄清楚问题出在哪里。我正在尝试将不同的任务路由到他们自己的特定队列。在 RabbitMQ 中,我有默认的 celery
队列和另一个名为 test_queue
的队列。我可以将任务推送到 celery
就好了,但是我无法弄清楚如何将任务推送到 test_queue
队列。我已经阅读了芹菜设置 task_default_queue
、task_queue
和 task_routes
.
celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('celery_test',
broker_url='amqp://',
backend='amqp://',
include=['celery_test.tasks'],
worker_max_tasks_per_child=1,
task_create_missing_queues=True,
#task_queues = {'test_queue': {'exchange': 'test_queue', 'routing_key': 'test_queue'}},
#task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
#task_default_queue='test_queue'
)
# Optional configuration, see the application user guide.
app.conf.update(result_expires=3600,)
app.conf.task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
if __name__ == '__main__':
app.start()
我注意到当我在 Celery
构造函数中指定 test_queue
队列名称时,我的任务没有正确路由到正确的 RabbitMQ 队列。相反,我不得不使用 app.conf.task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
来完成工作。我知道这看起来微不足道,但我已经为此苦苦思索了几个小时,我无法弄清楚为什么一种方法有效而另一种方法无效。
这是我的 celeryconfig.py
broker_url="amqp://"
result_backend="amqp://"
include=["tasks"]
task_acks_late=True
task_default_rate_limit="150/m"
task_time_limit=300
worker_prefetch_multiplier=1
worker_max_tasks_per_child=2
task_routes={'tasks.test': {'queue': 'test_queue'}}
我正在使用
- 4.3.0(大黄)
- Python 3.5.2
据我所知,Celery 4.x (class) 构造函数不采用您提到的那些参数(task_queues、task_routes 和 task_default_queue).