获取 celery AsyncResult 的任务名称

get task name of celery AsyncResult

我正在使用 celery 来执行我的异步任务,我想要实现的是在我执行它之后获取工作流中每个任务的名称和 ID。

        exec_workflow = chain(
            task1.si(),
            task2.si(),
            task3.si()
    )

    result = exec_workflow.apply_async()

    tasks = []
    for t in result._parents():
        tasks.append({"id": t.id, "name": t.name})

但由于某些奇怪的原因,AsyncResult 似乎没有名称 属性。知道什么是合适的方法吗?

一种不同的方法可能会在我执行 apply_async 之前在每个任务上强制使用一个 id,这将解决我的问题,因为我将能够将 id 与任务名称相匹配。但我不确定是否可行。

谢谢。

好的,我的问题已经解决了。我最终所做的只是设置每个任务的 id 属性。

不是最好的解决方案,但它有效。

result = signature.apply_async()
result._cache['task_name']
#'procedures.tasks.stop'

Celery 中有一个配置选项 result_extended 用于此目的(默认设置为 False)。

Enables extended task result attributes (name, args, kwargs, worker, retries, queue, delivery_info) to be written to backend.

参考: https://docs.celeryproject.org/en/master/userguide/configuration.html#result-extended

消费者示例(工人)

from typing import Final

from celery import Celery

app: Final = Celery(
    broker="amqp://...",
    result_backend="redis://...",
    result_extended=True,
)

@app.task(
    name="foo-service:bar"
)
def _() -> int:
    return 42

生产者示例(客户端)

from pprint import pprint
from typing import Final

from celery import Celery
from celery.result import AsyncResult

app: Final = Celery(broker="amqp://...", result_backend="redis://...")

result: AsyncResult = app.send_task("foo-service:bar")
assert result.get() == 42
assert result.name == "foo-service:bar"
assert result.queue == ...
assert result.args == ...
assert result.kwargs == ...
assert result.worker == ...
pprint(result.__dict__)