并行流中的多个 Scattergathers 运行
Multiple Scattergathers in flow run in parallel
我确实有一个要求,我需要调用 3 个 API,其中 API 之一依赖于其他的输出。
如果我们说我们有 3 APIs A、B 和 C
A 和 B 可以 运行 并行,但 C 取决于 B 的输出。
下面是伪代码。
public IntegrationFlow mainFlow()
{
return flow -> flow
.scatterGather(scatterRequests(), aggregateResponse())
.enrichHeaders(s -> s.headerExpressions(h -> h.put(AGGREGATED_RESPONSE, "payload")))
.scatterGather(scatterOtherRequests(), aggregateResponse());
}
private Consumer<RecipientListRouterSpec> scatterRequests()
{
return scatterer -> scatterer
.applySequence(true)
.recipientFlow(flowA())
.recipientFlow(flowB());
}
private Consumer<RecipientListRouterSpec> scatterOtherRequests()
{
return scatterer -> scatterer
.applySequence(true)
.recipientFlow(flowC())
.recipientFlow(aggregatedResponseSubFlow());
}
where aggregatedResponseSubFlow is as below
private IntegrationFlow raggregatedResponseSubFlow()
{
return subflow -> subflow
.handle((payload, headers) -> {
return headers.get(AGGREGATED_RESPONSE);
});
}
尽管有 2 个 Scattergathers 一个接一个,但它们 运行 是并行的,因此我得到了不一致的结果。那么我怎样才能 运行 它顺序呢。即在第一个执行后,第二个应该开始。如果无法完成,该场景还能如何处理?
注意:我确实需要汇总来自所有 3 个服务的响应
您的代码片段中没有任何内容可以确认进程是并行完成的。但可能您只是为了简化向我们展示的代码而删除了该部分。我假设是因为不清楚 scatterOtherRequests()
是什么,但您显示的是 scatterPendingRequests()
。
无论如何,最好重新考虑一下您的解决方案。听起来更像 A
和 B1
& B2
并像这样并行处理:
A
<
B1 -> B2
因此听起来您不需要第二次分散-聚集。
即使您在主流中需要一个 B1
结果,您仍然可以在回复主流之前将其与该分支中的 B2
以某种方式组合。
我确实有一个要求,我需要调用 3 个 API,其中 API 之一依赖于其他的输出。 如果我们说我们有 3 APIs A、B 和 C A 和 B 可以 运行 并行,但 C 取决于 B 的输出。
下面是伪代码。
public IntegrationFlow mainFlow()
{
return flow -> flow
.scatterGather(scatterRequests(), aggregateResponse())
.enrichHeaders(s -> s.headerExpressions(h -> h.put(AGGREGATED_RESPONSE, "payload")))
.scatterGather(scatterOtherRequests(), aggregateResponse());
}
private Consumer<RecipientListRouterSpec> scatterRequests()
{
return scatterer -> scatterer
.applySequence(true)
.recipientFlow(flowA())
.recipientFlow(flowB());
}
private Consumer<RecipientListRouterSpec> scatterOtherRequests()
{
return scatterer -> scatterer
.applySequence(true)
.recipientFlow(flowC())
.recipientFlow(aggregatedResponseSubFlow());
}
where aggregatedResponseSubFlow is as below
private IntegrationFlow raggregatedResponseSubFlow()
{
return subflow -> subflow
.handle((payload, headers) -> {
return headers.get(AGGREGATED_RESPONSE);
});
}
尽管有 2 个 Scattergathers 一个接一个,但它们 运行 是并行的,因此我得到了不一致的结果。那么我怎样才能 运行 它顺序呢。即在第一个执行后,第二个应该开始。如果无法完成,该场景还能如何处理?
注意:我确实需要汇总来自所有 3 个服务的响应
您的代码片段中没有任何内容可以确认进程是并行完成的。但可能您只是为了简化向我们展示的代码而删除了该部分。我假设是因为不清楚 scatterOtherRequests()
是什么,但您显示的是 scatterPendingRequests()
。
无论如何,最好重新考虑一下您的解决方案。听起来更像 A
和 B1
& B2
并像这样并行处理:
A
<
B1 -> B2
因此听起来您不需要第二次分散-聚集。
即使您在主流中需要一个 B1
结果,您仍然可以在回复主流之前将其与该分支中的 B2
以某种方式组合。