芹菜一个经纪人多个队列和工人
celery one broker multiple queues and workers
我有一个名为 tasks.py
的 python 文件,我在其中定义了 4 个单一任务。我想配置 celery 以使用 4 个队列,因为每个队列都会分配不同数量的工作人员。我正在阅读我应该使用 route_task 属性 但我尝试了几种选择但没有成功。
我正在关注这个文档 celery route_tasks docs
我的目标是 运行 4 个工作人员,每个工作人员一个,并且不要将来自不同工作人员的任务混合在不同的队列中。这是可能的?这是一个好方法吗?
如果我做错了什么,我很乐意更改我的代码以使其正常工作
这是我目前的配置
tasks.py
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('queueA', routing_key='tasks.task_1'),
Queue('queueB', routing_key='tasks.task_2'),
Queue('queueC', routing_key='tasks.task_3'),
Queue('queueD', routing_key='tasks.task_4')
)
@app.task
def task_1():
print "Task of level 1"
@app.task
def task_2():
print "Task of level 2"
@app.task
def task_3():
print "Task of level 3"
@app.task
def task_4():
print "Task of level 4"
运行 celery 每个队列一个工人
celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
celery -A tasks worker --loglevel=debug -Q queueC --logfile=celery-C.log -n W3&
celery -A tasks worker --loglevel=debug -Q queueD --logfile=celery-D.log -n W4&
无需为将任务提交到不同队列而进行复杂的路由。照常定义您的任务。
from celery import celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def task_1():
print "Task of level 1"
@app.task
def task_2():
print "Task of level 2"
现在在对任务进行排队时,将任务放入适当的队列中。这是有关如何操作的示例。
In [12]: from tasks import *
In [14]: result = task_1.apply_async(queue='queueA')
In [15]: result = task_2.apply_async(queue='queueB')
这会将 task_1
放入名为 queueA
的队列,将 task_2
放入 queueB
。
现在您可以让您的工作人员开始使用它们了。
celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
注意:task
和 message
在答案中可以互换使用。它基本上是 producer
发送到 RabbitMQ
的有效负载
您可以遵循 Chillar 建议的方法,也可以定义和使用 task_routes
配置将消息路由到适当的队列。这样您就不需要在每次调用 apply_async
.
时都指定队列名称
示例:路由 task1 到 QueueA
和路由 task2 到 QueueB
app = Celery('my_app')
app.conf.update(
task_routes={
'task1': {'queue': 'QueueA'},
'task2': {'queue': 'QueueB'}
}
)
将任务发送到多个队列有点棘手。您将必须声明交换,然后使用适当的 routing_key
路由您的任务。您可以获得有关交换类型的更多信息 here。为了便于说明,让我们使用 direct
。
创建交易所
from kombu import Exchange, Queue, binding
exchange_for_queueA_and_B = Exchange('exchange_for_queueA_and_B', type='direct')
在该交换的队列上创建绑定
app.conf.update(
task_queues=(
Queue('QueueA', [
binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b')
]),
Queue('QueueB', [
binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b')
])
)
)
定义task_route
发送task1到exchange
app.conf.update(
task_routes={
'task1': {'exchange': 'exchange_for_queueA_and_B', 'routing_key': 'queue_a_and_b'}
}
)
您还可以按照 Chillar 在上述答案中的建议,在您的 apply_async
方法中声明 exchange
和 routing_key
的这些选项。
之后,您可以在同一台机器或不同机器上定义您的工作人员,以从这些队列中消费。
celery -A my_app worker -n consume_from_QueueA_and_QueueB -Q QueueA,QueueB
celery -A my_app worker -n consume_from_QueueA_only -Q QueueA
我有一个名为 tasks.py
的 python 文件,我在其中定义了 4 个单一任务。我想配置 celery 以使用 4 个队列,因为每个队列都会分配不同数量的工作人员。我正在阅读我应该使用 route_task 属性 但我尝试了几种选择但没有成功。
我正在关注这个文档 celery route_tasks docs
我的目标是 运行 4 个工作人员,每个工作人员一个,并且不要将来自不同工作人员的任务混合在不同的队列中。这是可能的?这是一个好方法吗?
如果我做错了什么,我很乐意更改我的代码以使其正常工作
这是我目前的配置
tasks.py
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('queueA', routing_key='tasks.task_1'),
Queue('queueB', routing_key='tasks.task_2'),
Queue('queueC', routing_key='tasks.task_3'),
Queue('queueD', routing_key='tasks.task_4')
)
@app.task
def task_1():
print "Task of level 1"
@app.task
def task_2():
print "Task of level 2"
@app.task
def task_3():
print "Task of level 3"
@app.task
def task_4():
print "Task of level 4"
运行 celery 每个队列一个工人
celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
celery -A tasks worker --loglevel=debug -Q queueC --logfile=celery-C.log -n W3&
celery -A tasks worker --loglevel=debug -Q queueD --logfile=celery-D.log -n W4&
无需为将任务提交到不同队列而进行复杂的路由。照常定义您的任务。
from celery import celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def task_1():
print "Task of level 1"
@app.task
def task_2():
print "Task of level 2"
现在在对任务进行排队时,将任务放入适当的队列中。这是有关如何操作的示例。
In [12]: from tasks import *
In [14]: result = task_1.apply_async(queue='queueA')
In [15]: result = task_2.apply_async(queue='queueB')
这会将 task_1
放入名为 queueA
的队列,将 task_2
放入 queueB
。
现在您可以让您的工作人员开始使用它们了。
celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
注意:task
和 message
在答案中可以互换使用。它基本上是 producer
发送到 RabbitMQ
您可以遵循 Chillar 建议的方法,也可以定义和使用 task_routes
配置将消息路由到适当的队列。这样您就不需要在每次调用 apply_async
.
示例:路由 task1 到 QueueA
和路由 task2 到 QueueB
app = Celery('my_app')
app.conf.update(
task_routes={
'task1': {'queue': 'QueueA'},
'task2': {'queue': 'QueueB'}
}
)
将任务发送到多个队列有点棘手。您将必须声明交换,然后使用适当的 routing_key
路由您的任务。您可以获得有关交换类型的更多信息 here。为了便于说明,让我们使用 direct
。
创建交易所
from kombu import Exchange, Queue, binding exchange_for_queueA_and_B = Exchange('exchange_for_queueA_and_B', type='direct')
在该交换的队列上创建绑定
app.conf.update( task_queues=( Queue('QueueA', [ binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b') ]), Queue('QueueB', [ binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b') ]) ) )
定义
task_route
发送task1到exchangeapp.conf.update( task_routes={ 'task1': {'exchange': 'exchange_for_queueA_and_B', 'routing_key': 'queue_a_and_b'} } )
您还可以按照 Chillar 在上述答案中的建议,在您的 apply_async
方法中声明 exchange
和 routing_key
的这些选项。
之后,您可以在同一台机器或不同机器上定义您的工作人员,以从这些队列中消费。
celery -A my_app worker -n consume_from_QueueA_and_QueueB -Q QueueA,QueueB
celery -A my_app worker -n consume_from_QueueA_only -Q QueueA