使用倒计时 task.apply_async 后,Celery 任务未出现在队列中
Celery task does not appear in queue after task.apply_async using countdown
我希望这个问题符合堆栈溢出的条件。
使用 celery 4.2.0 和 Redis 作为代理和后端。
有任务
@shared_task()
def add(a, b):
return a+b
当 worker 处于活动状态时,运行 休假命令:
add.apply_async(countdown=60)
导致任务没有注册到默认的celery队列,
但在 countdown
中规定的时间段后仍在执行
为什么会这样,我如何查找所有待处理的任务?
如果将任务注册到队列中,这样做会奏效:
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange('celery', 0, -1)
如果我在任务还没有开始时终止 worker,我会得到休假:
[WARNING/MainProcess] Restoring 1 unacknowledged message(s)
这告诉我任务保存在队列以外的其他地方,但我不知道在哪里
事实证明,这实际上是预期的行为,因为工作人员甚至在执行之前就抓取挂起的任务以加快速度,从而将它们从队列中移除。
为了实现我想要的,即禁止工作人员在实际开始处理任务之前从队列中获取任务,并失去对该任务的跟踪,我不得不同时使用 2 个 celery 设置
task_acks_late = True
worker_prefetch_multiplier = 1
通过这种方式,任务从其原始队列中删除,但仍存在于名为 'unacked' 的队列中,这使我可以监视它。 Optimizing celery
请注意,使用 'acks_late' acks late 设置有一些副作用,您需要注意,例如,如果工作人员意外终止,任务将恢复并重新 运行 下一次工人复活。
我希望这个问题符合堆栈溢出的条件。
使用 celery 4.2.0 和 Redis 作为代理和后端。
有任务
@shared_task()
def add(a, b):
return a+b
当 worker 处于活动状态时,运行 休假命令:
add.apply_async(countdown=60)
导致任务没有注册到默认的celery队列, 但在 countdown
中规定的时间段后仍在执行为什么会这样,我如何查找所有待处理的任务? 如果将任务注册到队列中,这样做会奏效:
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange('celery', 0, -1)
如果我在任务还没有开始时终止 worker,我会得到休假:
[WARNING/MainProcess] Restoring 1 unacknowledged message(s)
这告诉我任务保存在队列以外的其他地方,但我不知道在哪里
事实证明,这实际上是预期的行为,因为工作人员甚至在执行之前就抓取挂起的任务以加快速度,从而将它们从队列中移除。
为了实现我想要的,即禁止工作人员在实际开始处理任务之前从队列中获取任务,并失去对该任务的跟踪,我不得不同时使用 2 个 celery 设置
task_acks_late = True
worker_prefetch_multiplier = 1
通过这种方式,任务从其原始队列中删除,但仍存在于名为 'unacked' 的队列中,这使我可以监视它。 Optimizing celery
请注意,使用 'acks_late' acks late 设置有一些副作用,您需要注意,例如,如果工作人员意外终止,任务将恢复并重新 运行 下一次工人复活。