从并行的 Celery 任务执行中收集结果

Collect results from parallel Celery task executions

我正在尝试实现一个相当简单的 Celery 工作流程,在该工作流程中,我收到了对同一任务的多次并行调用的结果作为元组(或列表)。

@app.task
def add(x, y):
    return x + y

@app.task
def master():
    return group(add.s(1, 2), add.s(3, 4))()

据此,我想以通用方式 检索(3, 7) ,即不依赖于工作流本身的方式。我正在寻找某种 "reduce async result graph to primitives" 操作。我已经尝试了以下内容(为简洁起见,我已将结果 ID 替换为 #num

r = master.delay()
r.get()      # <GroupResult: #1 [#2, #3]>
r.collect()  # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>),
             #  (<GroupResult: #1 [#2, #3]>, [3, 7])
             #  (<AsyncResult: #2>, 3),
             #  (<GroupResult: #3>, 7)]

r.get() returns 两个 AsyncResult ID 的包装器,所以我必须递归地处理每个 ID。 r.collect() 很接近,但是递归太深了。

我可以做类似的事情

r.children[0].get()

但这不是通用的,因为它明确取决于结果图的结构。此外,我可以遍历 r.collect() 直到找到一个元组,其值不是 ResultBase 的实例,例如

next(value for _, value in r.collect() if not isinstance(value, ResultBase))

但我不确定这是否在所有情况下都是正确的,我希望有更优雅的方法来做到这一点。

如果有一种方法可以重组 master 任务以使检索结果更容易,我愿意接受,只要并行启动子任务即可。任何建议,将不胜感激。提前谢谢你。


EDIT 一个相关的问题是,如果我想以非阻塞方式检索任务结果(例如,通过在调用 r.status 之前手动轮询 r.get()r.collect(),我不能简单地这样做

r = master.delay()

# some time later...
if r.status in READY_STATES:
    r.get()

因为 r 是解析为 GroupResultAsyncResult,即它在 GroupResult 或其子项之前完成。有没有办法以 "skips" 顶级 AsyncResult 的方式调用组?这将解决这两个问题,因为 r.statusr.get() 将分别反映子任务的状态和价值。

当然,正确的解决方案原来是最简单的:调用master作为函数在当前进程中执行。

r = master()
r.get()      # [3, 7]
r.collect()  # [(<GroupResult: #1 [#2, #3]>, [3, 7]),
             #  (<AsyncResult: #2>, 3),
             #  (<AsyncResult: #3>, 7)]

不是将 group 启动代码延迟到工作进程,而是在当前进程中启动。由于 group 是完全异步的,因此行为不会改变,性能也会提高。