Celery - 如何通过任务 ID 获取任务名称?

Celery - how to get task name by task id?

Celery - 底线:我想通过任务 ID 获取任务名称(我没有任务对象)

假设我有这个代码:

res = chain(add.s(4,5), add.s(10)).delay()
cache.save_task_id(res.task_id)

然后在其他地方:

task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')

我知道如果我有一个任务对象就可以获得任务名称,就像这里:。 但是我没有任务对象(也许它可以通过 task_id 或其他方式创建?我在文档中没有看到任何与此相关的内容)。

另外,我不想在缓存中保存任务名称。 (假设我有一个很长的 chain/other celery primitives,我不想保存它们的所有 names/task_ids。只需要最后一个 task_id 就足以获取有关所有任务的所有信息, 使用 .parents 等)

我查看了 AsyncResult 和 AsyncResult.Backend 对象的所有相关方法。唯一似乎相关的是 backend.get_task_meta(task_id),但它不包含任务名称。 提前致谢

PS: AsyncResult.name 总是 returns None:

result = AsyncResult(task_id, app=celery_app)
result.name #Returns None
result.args #Also returns None

像下面这样的东西(伪代码)应该足够了:

app = Celery("myapp")  # add your parameters here
task_id = "6dc5f968-3554-49c9-9e00-df8aaf9e7eb5"
aresult = app.AsyncResult(task_id)
task_name = aresult.name
task_args = aresult.args
print(task_name, task_args)

不幸的是,它不起作用(我会说这是 Celery 中的一个错误),所以我们必须找到一个替代方案。我首先想到的是 Celery CLI 具有 inspect query_task 功能,这暗示我可以使用检查 API 来查找任务名称,我是对的。这是代码:

# Since the expected way does not work we need to use the inspect API:
insp = app.control.inspect()
task_ids = [task_id]
inspect_result = insp.query_task(*task_ids)
# print(inspect_result)
for node_name in inspect_result:
    val = inspect_result[node_name]
    if val:
        # we found node that executes the task
        arr = val[task_id]
        state = arr[0]
        meta = arr[1]
        task_name = meta["name"]
        task_args = meta["args"]
        print(task_name, task_args)

这种方法的问题在于它仅在任务 运行 时有效。一旦完成,您将无法使用上面的代码。

终于找到答案了。 对于任何想知道的人: 您可以通过在您的芹菜配置中启用 result_extended = True 来解决这个问题。 那么:

result = AsyncResult(task_id, app=celery_app)
result.task_name #tasks.add

celery.result.AsyncResult but not all the properties are populated unless you enable result_extended = True as per configuration docs 的文档中这不是很清楚:

result_extended

Default: False

Enables extended task result attributes (name, args, kwargs, worker, retries, queue, delivery_info) to be written to backend.

然后以下将起作用:

result = AsyncResult(task_id)
result.name = 'project.tasks.my_task'
result.args = [2, 3]
result.kwargs = {'a': 'b'}

另请注意,rpc:// 后端不存储此数据,您将需要 Redis 或类似工具。如果您使用的是 rpc,即使使用 result_extended = True,您仍然会返回 None

我在这个 code snippet 中找到了一个很好的答案。

如果你有一个 AsyncResult 的实例,你不需要 task_id,你可以简单地这样做:


result # instance of AsyncResult
result_meta = result._get_task_meta()
task_name = result_meta.get("task_name")

当然这依赖于私有方法,所以有点hacky。我希望 celery 引入一种更简单的方法来检索它 - 它对测试特别有用。