Spring 集成中下行流完成后如何从队列中一次轮询 1 条消息

How to poll from a queue 1 message at a time after downstream flow is completed in Spring Integration

我目前正在努力提高集成流程的性能,试图并行化消息处理。我已经使用 Java DSL 实现了所有。

当前的集成流程从具有固定轮询器的队列通道获取消息,并通过多个处理程序依次处理消息,直到它到达最终处理程序,该处理程序会考虑前一个处理程序的每个进行一些最终计算输出。它们都连接在同一个集成流中。基本上,这些处理程序包装了对外部系统的调用。我需要在这里保留的重要一点是,在前一个消息的所有下游流完成之前,不得从队列中取出消息。我需要并行化的是处理程序。

当前集成流程: MessageQueue -> 轮询器 -> 处理程序 1 -> 处理程序 2 -> 处理程序 X -> 最终处理程序

我尝试在执行以下操作时合并并行性,并且效果很好。

MessageQueue -> Poller -> Splitter -> Executor -> Router,子流映射到不同的 Handlers -> Aggregator -> Final Handler

我发现这种方法的问题是,在前一个消息通过所有下游流之前,从队列通道中获取了一条新消息。很清楚为什么添加 Splitter 和 Executor 会改变消息的处理方式,但问题是不同消息的结果之间可能存在依赖关系。

问题是,如何像 "suspend" 轮询器那样一次从队列通道检索一条消息,直到正在处理的消息到达聚合器之后的最后一个端点?我不知道如何重新排列组件或我还能做些什么来实现这一目标。

抱歉,我试图寻找答案,但我找不到...请在这里需要一些指导。 非常感谢


@Blink 这对我有用,可能需要一些重构,我相信它可以写得更优雅。我不是专家,抱歉。

嗯,基本要素是:

  1. 包装消息系统的接口
  2. 调用网关方法时将路由消息的消息通道

    @Bean
    public DirectChannel integrationChannel() {
        return MessageChannels.direct().get();
    }
    
    @MessagingGateway
    interface WrappingGateway {
    
        @Gateway(requestChannel = "integrationChannel")
        TrackingLog executeIntegration(TrackingLog trackingLog);
    
    }
    

TrackingLog 是我用来记录下游流程结果的模型。

基本上我在集成流中调用包装网关,它从消息队列中提取消息。

@Autowired
WrappingGateway integrationGateway;

@Bean
public IntegrationFlow createCatalogueChannelFlow() {
    return IntegrationFlows.from(cataloguePriorityChannel())

            // Queue Poller
            .bridge(s -> s.poller(Pollers.fixedRate(1, TimeUnit.SECONDS).maxMessagesPerPoll(1)).autoStartup(true)
                    .id("cataloguePriorityChannelBridge"))

            // Call to Gateway 
            .handle(m -> {
                this.integrationGateway
                        .executeIntegration(((TrackingLog) m.getPayload()));
            })

            .get();
}

@Bean
public IntegrationFlow startCatalogueIntegrationChannelFlow() {
    return IntegrationFlows.from(integrationChannel())

            // Log
            .handle(trackerSupportClient, "logMessagePreExecution")

            // Set TrackingLog in message Header
            .enrichHeaders(e -> e.headerFunction("TRACKING_LOG", m -> {
                return ((TrackingLog) m.getPayload());
            }))
 ....

整个集成有点复杂,它从异步 HTTP 网关、转换器、路由器、存储在 mongodb 等开始。这里的要点是,正如@Artem Bilan 向我建议的那样,对网关的调用会阻塞线程并阻止队列轮询器获取更多消息,直到当前消息被完全处理为止。

希望对您有所帮助。

这确实是一项有趣的任务...我将与您分享我的想法,您将选择最适合您的。

  1. 我们总是可以将一部分流包装成一个@MessagingGateway,等待回复。它的子流有多异步已经无关紧要了。因此,您可以并行执行这些任务,但网关仍会等待主线程中的回复,从而阻止队列中的下一次轮询。你应该确保你 return 的东西在子流结束时进入 replyChannel 以解除对主线程的阻塞。请在此处查看文档:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/messaging-endpoints.html#gateway

  2. 我们有一个开箱即用的 BarrierMessageHandler 组件。重点确实是用消息阻塞当前线程,直到某个触发器到达消息所属的相关性为止。只有这个组件的问题是你需要弄清楚如何为第一条消息释放障碍,因为这条消息将作为下一条消息的触发器。虽然我们可能可以使用一次性路由器绕过第一条消息的障碍......文档在这里:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/message-routing.html#barrier

  3. 我们有一个像 MessageSourcePollingTemplate 这样的组件。因此,您可以在需要时调用包裹在 MessageSource lambda 中的 QueueChannel。我现在有点想不出如何将其融入流程,但这又是一个如何暂停轮询的想法。请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/core.html#deferred-acks-message-source

  4. 另一种方法是将 MethodInterceptor 添加到 Poller 配置中,以便在某些 AtomicBoolean 状态为 [=20 时跳过调用 invocation.proceed() =].通过这种方式,您可以保持状态直到消息被处理,并且每个轮询任务都将跳过,直到您重置该状态。文档:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/messaging-endpoints.html#endpoint-pollingconsumer