Celery 从链式任务生成组任务

Celery Generating group tasks from chain task

我正在尝试使用 celery(v4.0) 链接以下任务,

task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s()
result = task.get()

以上部分作为和弦在 generate_job_requests 之前工作正常。 但是问题从 execute_job 开始,它从 generate_job_requests 获取作业列表,为此我需要创建并行任务,然后再创建所有作业的聚合结果。

我正在尝试验证芹菜是否可以使用这种任务图?是否有任何可能的替代工作流程来解决具有这种依赖性的问题? 我在文档中遗漏的任何内容。

我将类似地图的功能与中间任务创建器一起使用,它的作用类似于和弦,

@shared_task(ignore_result=False)
def dmap(it, callback, end_task):
    callback = subtask(callback)
    grp = group(callback.clone([arg, ]) for arg in it)
    c = (grp | end_task)
    return c()

所以任务流程减少了,

task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(
        execute_job.s(), aggregate_result.s())).apply_async()

为了获得任务的最终输出,我做了一些调整,

# As we get dmap task id here
dmap_task = celery_app.AsyncResult(task.id)
dmap_result = dmap_task.get()
# Get actual aggregate_result task id
aggr_res_task_id = dmap_result[0][0]
result = celery_app.AsyncResult(aggr_res_task_id)
# Here we receive actual output of overall task
result.get()

我提到了 answer