Spring 集成 Scatter-Gather 模式与 JMS 传输
Spring integration Scatter-Gather pattern with JMS transport
我需要实现以下架构:
我有必须使用 JMS 发送到系统(一些外部应用程序)的数据。
根据需要发送的数据只发送给必要的系统(比如系统数为4,那么可以从1发送到4)
需要等待消息发送到的系统的响应,在收到所有响应后,需要处理接收到的数据(或处理至少一个超时)
关联 ID 包含在传出和传入 JMS 消息的 header 中
每个新的此类进程都可以异步和并行启动
现在我只在 Spring JMS 的帮助下实现了它。我手动同步线程,我也手动管理线程池。
有关发送消息的系统的相关 ID 和信息存储为状态,并在收到新消息等后更新它。
但我想简化逻辑并使用 Spring-integration Java DSL、分散聚集模式(这只是我的情况)和其他有用的 Spring 功能。
你能帮我举个例子说明如何在 Spring-integration/IntregrationFlow 的帮助下实现这样的架构吗?
这是我们测试用例中的一些示例:
@Bean
public IntegrationFlow scatterGatherFlow() {
return f -> f
.scatterGather(scatterer -> scatterer
.applySequence(true)
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10)),
gatherer -> gatherer
.releaseStrategy(group ->
group.size() == 3 ||
group.getMessages()
.stream()
.anyMatch(m -> (Double) m.getPayload() > 5)),
scatterGather -> scatterGather
.gatherTimeout(10_000));
}
所以,有以下部分:
scatterer
- 向收件人发送消息。在您的情况下,所有这些 JMS 服务。不过那可以是 scatterChannel
。通常 PublishSubscribeChannel
,因此 Scatter-Gather
可能无法提前知道订阅者。
gatherer
- 好吧,它只是一个 aggregator
及其所有可能的选项。
scatterGather
- 只是为了方便 ScatterGatherHandler
和通用端点选项的直接属性。
我需要实现以下架构:
我有必须使用 JMS 发送到系统(一些外部应用程序)的数据。
根据需要发送的数据只发送给必要的系统(比如系统数为4,那么可以从1发送到4)
需要等待消息发送到的系统的响应,在收到所有响应后,需要处理接收到的数据(或处理至少一个超时)
关联 ID 包含在传出和传入 JMS 消息的 header 中
每个新的此类进程都可以异步和并行启动
现在我只在 Spring JMS 的帮助下实现了它。我手动同步线程,我也手动管理线程池。
有关发送消息的系统的相关 ID 和信息存储为状态,并在收到新消息等后更新它。
但我想简化逻辑并使用 Spring-integration Java DSL、分散聚集模式(这只是我的情况)和其他有用的 Spring 功能。
你能帮我举个例子说明如何在 Spring-integration/IntregrationFlow 的帮助下实现这样的架构吗?
这是我们测试用例中的一些示例:
@Bean
public IntegrationFlow scatterGatherFlow() {
return f -> f
.scatterGather(scatterer -> scatterer
.applySequence(true)
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10)),
gatherer -> gatherer
.releaseStrategy(group ->
group.size() == 3 ||
group.getMessages()
.stream()
.anyMatch(m -> (Double) m.getPayload() > 5)),
scatterGather -> scatterGather
.gatherTimeout(10_000));
}
所以,有以下部分:
scatterer
- 向收件人发送消息。在您的情况下,所有这些 JMS 服务。不过那可以是scatterChannel
。通常PublishSubscribeChannel
,因此Scatter-Gather
可能无法提前知道订阅者。gatherer
- 好吧,它只是一个aggregator
及其所有可能的选项。scatterGather
- 只是为了方便ScatterGatherHandler
和通用端点选项的直接属性。