Celery 从链式任务生成组任务
Celery Generating group tasks from chain task
我正在尝试使用 celery(v4.0) 链接以下任务,
task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s()
result = task.get()
以上部分作为和弦在 generate_job_requests
之前工作正常。
但是问题从 execute_job
开始,它从 generate_job_requests
获取作业列表,为此我需要创建并行任务,然后再创建所有作业的聚合结果。
我正在尝试验证芹菜是否可以使用这种任务图?是否有任何可能的替代工作流程来解决具有这种依赖性的问题?
我在文档中遗漏的任何内容。
我将类似地图的功能与中间任务创建器一起使用,它的作用类似于和弦,
@shared_task(ignore_result=False)
def dmap(it, callback, end_task):
callback = subtask(callback)
grp = group(callback.clone([arg, ]) for arg in it)
c = (grp | end_task)
return c()
所以任务流程减少了,
task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(
execute_job.s(), aggregate_result.s())).apply_async()
为了获得任务的最终输出,我做了一些调整,
# As we get dmap task id here
dmap_task = celery_app.AsyncResult(task.id)
dmap_result = dmap_task.get()
# Get actual aggregate_result task id
aggr_res_task_id = dmap_result[0][0]
result = celery_app.AsyncResult(aggr_res_task_id)
# Here we receive actual output of overall task
result.get()
我提到了 answer
我正在尝试使用 celery(v4.0) 链接以下任务,
task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s()
result = task.get()
以上部分作为和弦在 generate_job_requests
之前工作正常。
但是问题从 execute_job
开始,它从 generate_job_requests
获取作业列表,为此我需要创建并行任务,然后再创建所有作业的聚合结果。
我正在尝试验证芹菜是否可以使用这种任务图?是否有任何可能的替代工作流程来解决具有这种依赖性的问题? 我在文档中遗漏的任何内容。
我将类似地图的功能与中间任务创建器一起使用,它的作用类似于和弦,
@shared_task(ignore_result=False)
def dmap(it, callback, end_task):
callback = subtask(callback)
grp = group(callback.clone([arg, ]) for arg in it)
c = (grp | end_task)
return c()
所以任务流程减少了,
task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(
execute_job.s(), aggregate_result.s())).apply_async()
为了获得任务的最终输出,我做了一些调整,
# As we get dmap task id here
dmap_task = celery_app.AsyncResult(task.id)
dmap_result = dmap_task.get()
# Get actual aggregate_result task id
aggr_res_task_id = dmap_result[0][0]
result = celery_app.AsyncResult(aggr_res_task_id)
# Here we receive actual output of overall task
result.get()
我提到了 answer