Celery - 如何通过任务 ID 获取任务名称?
Celery - how to get task name by task id?
Celery - 底线:我想通过任务 ID 获取任务名称(我没有任务对象)
假设我有这个代码:
res = chain(add.s(4,5), add.s(10)).delay()
cache.save_task_id(res.task_id)
然后在其他地方:
task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')
我知道如果我有一个任务对象就可以获得任务名称,就像这里:。
但是我没有任务对象(也许它可以通过 task_id 或其他方式创建?我在文档中没有看到任何与此相关的内容)。
另外,我不想在缓存中保存任务名称。 (假设我有一个很长的 chain/other celery primitives,我不想保存它们的所有 names/task_ids。只需要最后一个 task_id 就足以获取有关所有任务的所有信息, 使用 .parents 等)
我查看了 AsyncResult 和 AsyncResult.Backend 对象的所有相关方法。唯一似乎相关的是 backend.get_task_meta(task_id),但它不包含任务名称。
提前致谢
PS: AsyncResult.name 总是 returns None:
result = AsyncResult(task_id, app=celery_app)
result.name #Returns None
result.args #Also returns None
像下面这样的东西(伪代码)应该足够了:
app = Celery("myapp") # add your parameters here
task_id = "6dc5f968-3554-49c9-9e00-df8aaf9e7eb5"
aresult = app.AsyncResult(task_id)
task_name = aresult.name
task_args = aresult.args
print(task_name, task_args)
不幸的是,它不起作用(我会说这是 Celery 中的一个错误),所以我们必须找到一个替代方案。我首先想到的是 Celery CLI 具有 inspect query_task
功能,这暗示我可以使用检查 API 来查找任务名称,我是对的。这是代码:
# Since the expected way does not work we need to use the inspect API:
insp = app.control.inspect()
task_ids = [task_id]
inspect_result = insp.query_task(*task_ids)
# print(inspect_result)
for node_name in inspect_result:
val = inspect_result[node_name]
if val:
# we found node that executes the task
arr = val[task_id]
state = arr[0]
meta = arr[1]
task_name = meta["name"]
task_args = meta["args"]
print(task_name, task_args)
这种方法的问题在于它仅在任务 运行 时有效。一旦完成,您将无法使用上面的代码。
终于找到答案了。
对于任何想知道的人:
您可以通过在您的芹菜配置中启用 result_extended = True
来解决这个问题。
那么:
result = AsyncResult(task_id, app=celery_app)
result.task_name #tasks.add
celery.result.AsyncResult but not all the properties are populated unless you enable result_extended = True
as per configuration docs 的文档中这不是很清楚:
result_extended
Default: False
Enables extended task result attributes (name, args, kwargs, worker, retries, queue, delivery_info) to be written to backend.
然后以下将起作用:
result = AsyncResult(task_id)
result.name = 'project.tasks.my_task'
result.args = [2, 3]
result.kwargs = {'a': 'b'}
另请注意,rpc:// 后端不存储此数据,您将需要 Redis 或类似工具。如果您使用的是 rpc,即使使用 result_extended = True
,您仍然会返回 None
。
我在这个 code snippet 中找到了一个很好的答案。
如果你有一个 AsyncResult
的实例,你不需要 task_id,你可以简单地这样做:
result # instance of AsyncResult
result_meta = result._get_task_meta()
task_name = result_meta.get("task_name")
当然这依赖于私有方法,所以有点hacky。我希望 celery 引入一种更简单的方法来检索它 - 它对测试特别有用。
Celery - 底线:我想通过任务 ID 获取任务名称(我没有任务对象)
假设我有这个代码:
res = chain(add.s(4,5), add.s(10)).delay()
cache.save_task_id(res.task_id)
然后在其他地方:
task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')
我知道如果我有一个任务对象就可以获得任务名称,就像这里:
另外,我不想在缓存中保存任务名称。 (假设我有一个很长的 chain/other celery primitives,我不想保存它们的所有 names/task_ids。只需要最后一个 task_id 就足以获取有关所有任务的所有信息, 使用 .parents 等)
我查看了 AsyncResult 和 AsyncResult.Backend 对象的所有相关方法。唯一似乎相关的是 backend.get_task_meta(task_id),但它不包含任务名称。 提前致谢
PS: AsyncResult.name 总是 returns None:
result = AsyncResult(task_id, app=celery_app)
result.name #Returns None
result.args #Also returns None
像下面这样的东西(伪代码)应该足够了:
app = Celery("myapp") # add your parameters here
task_id = "6dc5f968-3554-49c9-9e00-df8aaf9e7eb5"
aresult = app.AsyncResult(task_id)
task_name = aresult.name
task_args = aresult.args
print(task_name, task_args)
不幸的是,它不起作用(我会说这是 Celery 中的一个错误),所以我们必须找到一个替代方案。我首先想到的是 Celery CLI 具有 inspect query_task
功能,这暗示我可以使用检查 API 来查找任务名称,我是对的。这是代码:
# Since the expected way does not work we need to use the inspect API:
insp = app.control.inspect()
task_ids = [task_id]
inspect_result = insp.query_task(*task_ids)
# print(inspect_result)
for node_name in inspect_result:
val = inspect_result[node_name]
if val:
# we found node that executes the task
arr = val[task_id]
state = arr[0]
meta = arr[1]
task_name = meta["name"]
task_args = meta["args"]
print(task_name, task_args)
这种方法的问题在于它仅在任务 运行 时有效。一旦完成,您将无法使用上面的代码。
终于找到答案了。
对于任何想知道的人:
您可以通过在您的芹菜配置中启用 result_extended = True
来解决这个问题。
那么:
result = AsyncResult(task_id, app=celery_app)
result.task_name #tasks.add
celery.result.AsyncResult but not all the properties are populated unless you enable result_extended = True
as per configuration docs 的文档中这不是很清楚:
result_extended
Default: False
Enables extended task result attributes (name, args, kwargs, worker, retries, queue, delivery_info) to be written to backend.
然后以下将起作用:
result = AsyncResult(task_id)
result.name = 'project.tasks.my_task'
result.args = [2, 3]
result.kwargs = {'a': 'b'}
另请注意,rpc:// 后端不存储此数据,您将需要 Redis 或类似工具。如果您使用的是 rpc,即使使用 result_extended = True
,您仍然会返回 None
。
我在这个 code snippet 中找到了一个很好的答案。
如果你有一个 AsyncResult
的实例,你不需要 task_id,你可以简单地这样做:
result # instance of AsyncResult
result_meta = result._get_task_meta()
task_name = result_meta.get("task_name")
当然这依赖于私有方法,所以有点hacky。我希望 celery 引入一种更简单的方法来检索它 - 它对测试特别有用。