获取 celery AsyncResult 的任务名称
get task name of celery AsyncResult
我正在使用 celery 来执行我的异步任务,我想要实现的是在我执行它之后获取工作流中每个任务的名称和 ID。
exec_workflow = chain(
task1.si(),
task2.si(),
task3.si()
)
result = exec_workflow.apply_async()
tasks = []
for t in result._parents():
tasks.append({"id": t.id, "name": t.name})
但由于某些奇怪的原因,AsyncResult 似乎没有名称 属性。知道什么是合适的方法吗?
一种不同的方法可能会在我执行 apply_async 之前在每个任务上强制使用一个 id,这将解决我的问题,因为我将能够将 id 与任务名称相匹配。但我不确定是否可行。
谢谢。
好的,我的问题已经解决了。我最终所做的只是设置每个任务的 id 属性。
不是最好的解决方案,但它有效。
result = signature.apply_async()
result._cache['task_name']
#'procedures.tasks.stop'
Celery 中有一个配置选项 result_extended
用于此目的(默认设置为 False
)。
Enables extended task result attributes (name, args, kwargs, worker, retries, queue, delivery_info) to be written to backend.
参考:
https://docs.celeryproject.org/en/master/userguide/configuration.html#result-extended
消费者示例(工人)
from typing import Final
from celery import Celery
app: Final = Celery(
broker="amqp://...",
result_backend="redis://...",
result_extended=True,
)
@app.task(
name="foo-service:bar"
)
def _() -> int:
return 42
生产者示例(客户端)
from pprint import pprint
from typing import Final
from celery import Celery
from celery.result import AsyncResult
app: Final = Celery(broker="amqp://...", result_backend="redis://...")
result: AsyncResult = app.send_task("foo-service:bar")
assert result.get() == 42
assert result.name == "foo-service:bar"
assert result.queue == ...
assert result.args == ...
assert result.kwargs == ...
assert result.worker == ...
pprint(result.__dict__)
我正在使用 celery 来执行我的异步任务,我想要实现的是在我执行它之后获取工作流中每个任务的名称和 ID。
exec_workflow = chain(
task1.si(),
task2.si(),
task3.si()
)
result = exec_workflow.apply_async()
tasks = []
for t in result._parents():
tasks.append({"id": t.id, "name": t.name})
但由于某些奇怪的原因,AsyncResult 似乎没有名称 属性。知道什么是合适的方法吗?
一种不同的方法可能会在我执行 apply_async 之前在每个任务上强制使用一个 id,这将解决我的问题,因为我将能够将 id 与任务名称相匹配。但我不确定是否可行。
谢谢。
好的,我的问题已经解决了。我最终所做的只是设置每个任务的 id 属性。
不是最好的解决方案,但它有效。
result = signature.apply_async()
result._cache['task_name']
#'procedures.tasks.stop'
Celery 中有一个配置选项 result_extended
用于此目的(默认设置为 False
)。
Enables extended task result attributes (name, args, kwargs, worker, retries, queue, delivery_info) to be written to backend.
参考: https://docs.celeryproject.org/en/master/userguide/configuration.html#result-extended
消费者示例(工人)
from typing import Final
from celery import Celery
app: Final = Celery(
broker="amqp://...",
result_backend="redis://...",
result_extended=True,
)
@app.task(
name="foo-service:bar"
)
def _() -> int:
return 42
生产者示例(客户端)
from pprint import pprint
from typing import Final
from celery import Celery
from celery.result import AsyncResult
app: Final = Celery(broker="amqp://...", result_backend="redis://...")
result: AsyncResult = app.send_task("foo-service:bar")
assert result.get() == 42
assert result.name == "foo-service:bar"
assert result.queue == ...
assert result.args == ...
assert result.kwargs == ...
assert result.worker == ...
pprint(result.__dict__)