Python Celery - 如果队列不可用则引发
Python Celery - Raise if queue is not available
我在我的 Celery 配置中定义了一个路由:
task_routes = {'tasks.add': {'queue': 'calculate'}}
这样只有特定的工作人员才能 运行 该任务。我开始我的工人:
celery -A myproj worker -n worker1@%h -Q calculate
然后运行我的任务:
add.apply_async((2, 2), time_limit=5)
一切顺利。但是现在,假设我的员工去世了,我又尝试 运行 我的任务。它挂了,永远。 Time_limit 对我没有任何好处,因为任务永远不会进入队列。在这种情况下如何定义超时?换句话说,如果在接下来的 X 秒内没有队列可用,我想抛出一个错误。这可能吗?
我假设您使用 Rabbitmq 作为消息代理,如果您使用的话,关于 Rabbitmq(和其他类似 AMQP 的消息队列)的工作方式有一些微妙之处。
首先,当您发送消息时,您的进程将其发送到交换器,交换器又将消息路由到 0 个或多个队列。您的队列可能有也可能没有消费者(即 celery 工作人员)消费消息,但作为发送方,您无法控制接收方,除非该工作人员主动回复。
但是,我认为可以通过执行以下操作来实现您想要的(假设您有后端)
- 确保使用您选择的 Message TTL 声明您的队列(假设为 60 秒)。如果没有附加消费者,还要确保它没有被声明为删除。同时声明死信交换。
- 让一个 celery worker 听你的死信交换,但是那个 worker 在收到消息时会引发适当的异常。这里最简单的可能是收听消息,但不加载任何任务。这样,它会导致您的后端出现故障,说明未实现的任务。
如果你原来的工人死了,队列中的任何消息都会在你选择的 TTL 后过期并被发送到你的死信交换,此时第二个工人(自动失败的)将收到消息并提出任务失败。
请注意,您需要将 TTL 设置得远高于您希望消息在 Rabbitmq 队列中停留的时间,因为无论是否有工作人员从队列中消费,它都会过期。
要设置第一个队列,我认为您需要如下配置:
Queue(
default_queue_name,
default_exchange,
routing_key=default_routing_key,
queue_arguments={
'x-message-ttl': 60000 # milliseconds
'x-dead-letter-exchange': deadletter_exchange_name,
'x-dead-letter-routing-key': deadletter_routing_key
})
死信队列看起来更像是标准的 celery worker 队列配置,但您可能希望为其单独配置,因为您不想为该 worker 加载任何任务。
综上所述,是的,这是可能的,但并不像人们想象的那么简单。
我在我的 Celery 配置中定义了一个路由:
task_routes = {'tasks.add': {'queue': 'calculate'}}
这样只有特定的工作人员才能 运行 该任务。我开始我的工人:
celery -A myproj worker -n worker1@%h -Q calculate
然后运行我的任务:
add.apply_async((2, 2), time_limit=5)
一切顺利。但是现在,假设我的员工去世了,我又尝试 运行 我的任务。它挂了,永远。 Time_limit 对我没有任何好处,因为任务永远不会进入队列。在这种情况下如何定义超时?换句话说,如果在接下来的 X 秒内没有队列可用,我想抛出一个错误。这可能吗?
我假设您使用 Rabbitmq 作为消息代理,如果您使用的话,关于 Rabbitmq(和其他类似 AMQP 的消息队列)的工作方式有一些微妙之处。 首先,当您发送消息时,您的进程将其发送到交换器,交换器又将消息路由到 0 个或多个队列。您的队列可能有也可能没有消费者(即 celery 工作人员)消费消息,但作为发送方,您无法控制接收方,除非该工作人员主动回复。
但是,我认为可以通过执行以下操作来实现您想要的(假设您有后端)
- 确保使用您选择的 Message TTL 声明您的队列(假设为 60 秒)。如果没有附加消费者,还要确保它没有被声明为删除。同时声明死信交换。
- 让一个 celery worker 听你的死信交换,但是那个 worker 在收到消息时会引发适当的异常。这里最简单的可能是收听消息,但不加载任何任务。这样,它会导致您的后端出现故障,说明未实现的任务。
如果你原来的工人死了,队列中的任何消息都会在你选择的 TTL 后过期并被发送到你的死信交换,此时第二个工人(自动失败的)将收到消息并提出任务失败。 请注意,您需要将 TTL 设置得远高于您希望消息在 Rabbitmq 队列中停留的时间,因为无论是否有工作人员从队列中消费,它都会过期。
要设置第一个队列,我认为您需要如下配置:
Queue(
default_queue_name,
default_exchange,
routing_key=default_routing_key,
queue_arguments={
'x-message-ttl': 60000 # milliseconds
'x-dead-letter-exchange': deadletter_exchange_name,
'x-dead-letter-routing-key': deadletter_routing_key
})
死信队列看起来更像是标准的 celery worker 队列配置,但您可能希望为其单独配置,因为您不想为该 worker 加载任何任务。
综上所述,是的,这是可能的,但并不像人们想象的那么简单。