Spring 集成 Scatter-Gather 模式与 JMS 传输

Spring integration Scatter-Gather pattern with JMS transport

我需要实现以下架构:

  1. 我有必须使用 JMS 发送到系统(一些外部应用程序)的数据。

  2. 根据需要发送的数据只发送给必要的系统(比如系统数为4,那么可以从1发送到4)

  3. 需要等待消息发送到的系统的响应,在收到所有响应后,需要处理接收到的数据(或处理至少一个超时)

  4. 关联 ID 包含在传出和传入 JMS 消息的 header 中

  5. 每个新的此类进程都可以异步和并行启动

现在我只在 Spring JMS 的帮助下实现了它。我手动同步线程,我也手动管理线程池。

有关发送消息的系统的相关 ID 和信息存储为状态,并在收到新消息等后更新它。

但我想简化逻辑并使用 Spring-integration Java DSL、分散聚集模式(这只是我的情况)和其他有用的 Spring 功能。

你能帮我举个例子说明如何在 Spring-integration/IntregrationFlow 的帮助下实现这样的架构吗?

这是我们测试用例中的一些示例:

    @Bean
    public IntegrationFlow scatterGatherFlow() {
        return f -> f
                .scatterGather(scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
                                .recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
                                .recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10)),
                        gatherer -> gatherer
                                .releaseStrategy(group ->
                                        group.size() == 3 ||
                                                group.getMessages()
                                                        .stream()
                                                        .anyMatch(m -> (Double) m.getPayload() > 5)),
                        scatterGather -> scatterGather
                                .gatherTimeout(10_000));
    }

所以,有以下部分:

  • scatterer - 向收件人发送消息。在您的情况下,所有这些 JMS 服务。不过那可以是 scatterChannel。通常 PublishSubscribeChannel,因此 Scatter-Gather 可能无法提前知道订阅者。

  • gatherer - 好吧,它只是一个 aggregator 及其所有可能的选项。

  • scatterGather - 只是为了方便 ScatterGatherHandler 和通用端点选项的直接属性。