并行流中的多个 Scattergathers 运行

Multiple Scattergathers in flow run in parallel

我确实有一个要求,我需要调用 3 个 API,其中 API 之一依赖于其他的输出。 如果我们说我们有 3 APIs A、B 和 C A 和 B 可以 运行 并行,但 C 取决于 B 的输出。

下面是伪代码。

public IntegrationFlow mainFlow()
    {
        return flow -> flow
                .scatterGather(scatterRequests(), aggregateResponse())
                .enrichHeaders(s -> s.headerExpressions(h -> h.put(AGGREGATED_RESPONSE, "payload")))
                .scatterGather(scatterOtherRequests(), aggregateResponse());
    }

private Consumer<RecipientListRouterSpec> scatterRequests()
         {
            return scatterer -> scatterer
                .applySequence(true)
                .recipientFlow(flowA())
                .recipientFlow(flowB());
        }
    
    private Consumer<RecipientListRouterSpec> scatterOtherRequests()
        {
            return scatterer -> scatterer
                .applySequence(true)
                .recipientFlow(flowC())
                .recipientFlow(aggregatedResponseSubFlow());
        }
    where aggregatedResponseSubFlow is as below
 private IntegrationFlow raggregatedResponseSubFlow() 
    {
         return subflow -> subflow
                 .handle((payload, headers) -> {
                     return headers.get(AGGREGATED_RESPONSE);
                 });
    }
    

尽管有 2 个 Scattergathers 一个接一个,但它们 运行 是并行的,因此我得到了不一致的结果。那么我怎样才能 运行 它顺序呢。即在第一个执行后,第二个应该开始。如果无法完成,该场景还能如何处理?

注意:我确实需要汇总来自所有 3 个服务的响应

您的代码片段中没有任何内容可以确认进程是并行完成的。但可能您只是为了简化向我们展示的代码而删除了该部分。我假设是因为不清楚 scatterOtherRequests() 是什么,但您显示的是 scatterPendingRequests()

无论如何,最好重新考虑一下您的解决方案。听起来更像 AB1 & B2 并像这样并行处理:

  A
<
  B1 -> B2

因此听起来您不需要第二次分散-聚集。 即使您在主流中需要一个 B1 结果,您仍然可以在回复主流之前将其与该分支中的 B2 以某种方式组合。