从并行的 Celery 任务执行中收集结果
Collect results from parallel Celery task executions
我正在尝试实现一个相当简单的 Celery 工作流程,在该工作流程中,我收到了对同一任务的多次并行调用的结果作为元组(或列表)。
@app.task
def add(x, y):
return x + y
@app.task
def master():
return group(add.s(1, 2), add.s(3, 4))()
据此,我想以通用方式 检索(3, 7)
,即不依赖于工作流本身的方式。我正在寻找某种 "reduce async result graph to primitives" 操作。我已经尝试了以下内容(为简洁起见,我已将结果 ID 替换为 #num
)
r = master.delay()
r.get() # <GroupResult: #1 [#2, #3]>
r.collect() # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>),
# (<GroupResult: #1 [#2, #3]>, [3, 7])
# (<AsyncResult: #2>, 3),
# (<GroupResult: #3>, 7)]
r.get()
returns 两个 AsyncResult
ID 的包装器,所以我必须递归地处理每个 ID。 r.collect()
很接近,但是递归太深了。
我可以做类似的事情
r.children[0].get()
但这不是通用的,因为它明确取决于结果图的结构。此外,我可以遍历 r.collect()
直到找到一个元组,其值不是 ResultBase
的实例,例如
next(value for _, value in r.collect() if not isinstance(value, ResultBase))
但我不确定这是否在所有情况下都是正确的,我希望有更优雅的方法来做到这一点。
如果有一种方法可以重组 master
任务以使检索结果更容易,我愿意接受,只要并行启动子任务即可。任何建议,将不胜感激。提前谢谢你。
EDIT 一个相关的问题是,如果我想以非阻塞方式检索任务结果(例如,通过在调用 r.status
之前手动轮询 r.get()
或 r.collect()
,我不能简单地这样做
r = master.delay()
# some time later...
if r.status in READY_STATES:
r.get()
因为 r
是解析为 GroupResult
的 AsyncResult
,即它在 GroupResult
或其子项之前完成。有没有办法以 "skips" 顶级 AsyncResult
的方式调用组?这将解决这两个问题,因为 r.status
和 r.get()
将分别反映子任务的状态和价值。
当然,正确的解决方案原来是最简单的:调用master
作为函数在当前进程中执行。
r = master()
r.get() # [3, 7]
r.collect() # [(<GroupResult: #1 [#2, #3]>, [3, 7]),
# (<AsyncResult: #2>, 3),
# (<AsyncResult: #3>, 7)]
不是将 group
启动代码延迟到工作进程,而是在当前进程中启动。由于 group
是完全异步的,因此行为不会改变,性能也会提高。
我正在尝试实现一个相当简单的 Celery 工作流程,在该工作流程中,我收到了对同一任务的多次并行调用的结果作为元组(或列表)。
@app.task
def add(x, y):
return x + y
@app.task
def master():
return group(add.s(1, 2), add.s(3, 4))()
据此,我想以通用方式 检索(3, 7)
,即不依赖于工作流本身的方式。我正在寻找某种 "reduce async result graph to primitives" 操作。我已经尝试了以下内容(为简洁起见,我已将结果 ID 替换为 #num
)
r = master.delay()
r.get() # <GroupResult: #1 [#2, #3]>
r.collect() # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>),
# (<GroupResult: #1 [#2, #3]>, [3, 7])
# (<AsyncResult: #2>, 3),
# (<GroupResult: #3>, 7)]
r.get()
returns 两个 AsyncResult
ID 的包装器,所以我必须递归地处理每个 ID。 r.collect()
很接近,但是递归太深了。
我可以做类似的事情
r.children[0].get()
但这不是通用的,因为它明确取决于结果图的结构。此外,我可以遍历 r.collect()
直到找到一个元组,其值不是 ResultBase
的实例,例如
next(value for _, value in r.collect() if not isinstance(value, ResultBase))
但我不确定这是否在所有情况下都是正确的,我希望有更优雅的方法来做到这一点。
如果有一种方法可以重组 master
任务以使检索结果更容易,我愿意接受,只要并行启动子任务即可。任何建议,将不胜感激。提前谢谢你。
EDIT 一个相关的问题是,如果我想以非阻塞方式检索任务结果(例如,通过在调用 r.status
之前手动轮询 r.get()
或 r.collect()
,我不能简单地这样做
r = master.delay()
# some time later...
if r.status in READY_STATES:
r.get()
因为 r
是解析为 GroupResult
的 AsyncResult
,即它在 GroupResult
或其子项之前完成。有没有办法以 "skips" 顶级 AsyncResult
的方式调用组?这将解决这两个问题,因为 r.status
和 r.get()
将分别反映子任务的状态和价值。
当然,正确的解决方案原来是最简单的:调用master
作为函数在当前进程中执行。
r = master()
r.get() # [3, 7]
r.collect() # [(<GroupResult: #1 [#2, #3]>, [3, 7]),
# (<AsyncResult: #2>, 3),
# (<AsyncResult: #3>, 7)]
不是将 group
启动代码延迟到工作进程,而是在当前进程中启动。由于 group
是完全异步的,因此行为不会改变,性能也会提高。