启动 celery worker 并为广播队列启用它

start celery worker and enable it for broadcast queue

我正在尝试启动 celery worker,因此它只听单个队列。这不是问题,我可以这样做:

python -m celery worker -A my_module -Q my_queue -c 1

但现在我也希望这个 my_queue 队列成为一个广播队列,所以我在我的 celeryconfig 中这样做:

from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)

但是一旦我这样做,我就无法再启动我的 worker,我从 rabbitmq 收到错误消息:

amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'

如果我在没有 -Q 的情况下启动 worker(但是如上所述将 Broadcast 留在 celeryconfig.py 中)并且我列出了 rabbitmq 队列,我可以看到广播队列是这样创建和命名的:

bcast.43fecba7-786a-461c-a322-620039b29b8b

同样,如果我在 worker 中定义此队列(如上所述使用 -Q)或像这样在 celeryconfig.py 中定义为简单的 Queue

from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)

我可以这样在 rabbitmq 中看到这个队列:

my_queue

似乎在定义队列时我在 Broadcast 调用中输入了什么并不重要 - 这似乎是内部芹菜名称,没有传递给 rabbitmq。

所以我猜当 worker 开始时 my_queue 被创建,一旦完成就无法创建 Broadcast.

我可以让工作人员监听任何队列(不仅是 my_queue),我将从删除 -Q 参数开始。但是如果能够有一个只监听那个特定队列的进程会很好,因为我投入其中的任务很快,我想尽可能地降低延迟。

--- 编辑 1 --- 花了一些时间解决这个问题,上面提到的 bcast 队列似乎并没有始终如一地出现。重置没有 -Q 选项的 rabbitmq 和 运行 celery 后 bcast 队列没有出现...

使用代理发送消息时,客户端和工作线程必须就相同的配置值达成一致。如果您必须更改配置,则需要清除现有消息并重新启动所有内容,以便它们同步。

启动广播队列时,您可以设置交换类型并配置队列。

from kombu.common import Broadcast
from kombu import Exchange


exchange = Exchange('custom_exchange', type='fanout')

CELERY_QUEUES = (
    Broadcast(name='bcast', exchange=exchange),
)

现在您可以使用

启动 worker
celery worker -l info -A tasks -Q bcast