启动 celery worker 并为广播队列启用它
start celery worker and enable it for broadcast queue
我正在尝试启动 celery worker,因此它只听单个队列。这不是问题,我可以这样做:
python -m celery worker -A my_module -Q my_queue -c 1
但现在我也希望这个 my_queue
队列成为一个广播队列,所以我在我的 celeryconfig 中这样做:
from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)
但是一旦我这样做,我就无法再启动我的 worker,我从 rabbitmq 收到错误消息:
amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'
如果我在没有 -Q
的情况下启动 worker(但是如上所述将 Broadcast
留在 celeryconfig.py
中)并且我列出了 rabbitmq 队列,我可以看到广播队列是这样创建和命名的:
bcast.43fecba7-786a-461c-a322-620039b29b8b
同样,如果我在 worker 中定义此队列(如上所述使用 -Q
)或像这样在 celeryconfig.py
中定义为简单的 Queue
:
from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)
我可以这样在 rabbitmq 中看到这个队列:
my_queue
似乎在定义队列时我在 Broadcast
调用中输入了什么并不重要 - 这似乎是内部芹菜名称,没有传递给 rabbitmq。
所以我猜当 worker 开始时 my_queue
被创建,一旦完成就无法创建 Broadcast
.
我可以让工作人员监听任何队列(不仅是 my_queue
),我将从删除 -Q
参数开始。但是如果能够有一个只监听那个特定队列的进程会很好,因为我投入其中的任务很快,我想尽可能地降低延迟。
--- 编辑 1 ---
花了一些时间解决这个问题,上面提到的 bcast
队列似乎并没有始终如一地出现。重置没有 -Q
选项的 rabbitmq 和 运行 celery 后 bcast
队列没有出现...
使用代理发送消息时,客户端和工作线程必须就相同的配置值达成一致。如果您必须更改配置,则需要清除现有消息并重新启动所有内容,以便它们同步。
启动广播队列时,您可以设置交换类型并配置队列。
from kombu.common import Broadcast
from kombu import Exchange
exchange = Exchange('custom_exchange', type='fanout')
CELERY_QUEUES = (
Broadcast(name='bcast', exchange=exchange),
)
现在您可以使用
启动 worker
celery worker -l info -A tasks -Q bcast
我正在尝试启动 celery worker,因此它只听单个队列。这不是问题,我可以这样做:
python -m celery worker -A my_module -Q my_queue -c 1
但现在我也希望这个 my_queue
队列成为一个广播队列,所以我在我的 celeryconfig 中这样做:
from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)
但是一旦我这样做,我就无法再启动我的 worker,我从 rabbitmq 收到错误消息:
amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'
如果我在没有 -Q
的情况下启动 worker(但是如上所述将 Broadcast
留在 celeryconfig.py
中)并且我列出了 rabbitmq 队列,我可以看到广播队列是这样创建和命名的:
bcast.43fecba7-786a-461c-a322-620039b29b8b
同样,如果我在 worker 中定义此队列(如上所述使用 -Q
)或像这样在 celeryconfig.py
中定义为简单的 Queue
:
from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)
我可以这样在 rabbitmq 中看到这个队列:
my_queue
似乎在定义队列时我在 Broadcast
调用中输入了什么并不重要 - 这似乎是内部芹菜名称,没有传递给 rabbitmq。
所以我猜当 worker 开始时 my_queue
被创建,一旦完成就无法创建 Broadcast
.
我可以让工作人员监听任何队列(不仅是 my_queue
),我将从删除 -Q
参数开始。但是如果能够有一个只监听那个特定队列的进程会很好,因为我投入其中的任务很快,我想尽可能地降低延迟。
--- 编辑 1 ---
花了一些时间解决这个问题,上面提到的 bcast
队列似乎并没有始终如一地出现。重置没有 -Q
选项的 rabbitmq 和 运行 celery 后 bcast
队列没有出现...
使用代理发送消息时,客户端和工作线程必须就相同的配置值达成一致。如果您必须更改配置,则需要清除现有消息并重新启动所有内容,以便它们同步。
启动广播队列时,您可以设置交换类型并配置队列。
from kombu.common import Broadcast
from kombu import Exchange
exchange = Exchange('custom_exchange', type='fanout')
CELERY_QUEUES = (
Broadcast(name='bcast', exchange=exchange),
)
现在您可以使用
启动 workercelery worker -l info -A tasks -Q bcast