芹菜任务没有完全从队列中删除

Celery tasks not being completely removed from queue

我有一个自动调用的 celery 任务(do_stuff.apply_async(queue="foo"))。以前我有 运行 app.control.add_consumer("foo", reply=True) 所以我的工人可以从这个队列中消费。

一段时间后,我想停止该队列中的所有任务以及从 do_stuff 启动的所有 运行ning 任务。

所以我运行这个代码:

app.control.cancel_consumer("foo", reply=True)

i = app.control.inspect()
for queue in [i.active, i.scheduled, i.reserved]:
    for worker_name, worker_tasks in queue().items():
        for task in worker_tasks:
            args = ast.literal_eval(task["args"])
            if "do_stuff" in task["name"] and args[0] == crawler.name:
                app.control.revoke(task["id"], terminate=True)

这"works"种。它确实停止了来自 do_stuff 的所有 运行ning 任务,并且它确实清除了计划任务(或者至少在 运行ning 这段代码之后我在 Flower 中看不到任何任务)。

问题是,如果我再次 运行 app.control.add_consumer("foo", reply=True),没有 运行ning 任何其他内容,新任务将开始 运行ning。这意味着 celery/redis 以某种方式设法将任务保留在某个地方。

为什么会这样?那些 "hidden" 任务保存在哪里?我该如何删除它们?

回答我自己的问题:发生这种情况是因为当我让工作人员不从队列中消费时(通过调用 cancel_consumer),队列本身仍然包含所有内容。

我找到了一种(以编程方式)刷新队列的方法:

from celery.bin.celery import CeleryCommand
cmd = CeleryCommand()
super(CeleryCommand, cmd).execute_from_commandline([
    '',
    'purge',
    '-f',
    '-Q', queue_name,
    '-A', 'main'
])