Spring 集成路由器行为从 4.3.10.RELEASE 到 5.1.5.RELEASE

Spring integration router behavior from 4.3.10.RELEASE to 5.1.5.RELEASE

我有一个基于 Spring 集成的应用程序,该应用程序的一部分具有以下配置。

它在 4.3 版本中运行良好。10.RELEASE。但是现在我正在尝试将版本升级到 5.1。5.RELEASE 并且应用程序无法启动,出现下面提到的异常。

我能够通过更改配置来解决问题,但我想知道为什么会发生这种变化

在 4.3 中工作的旧配置。10.RELEASE:

@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
                                StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
                                AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
    return flow -> flow.channel(inboundChannel)
            .channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
            .publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
                    flow1.handle(statusUpdater, "handle"))
                    .subscribe(flow2 -> flow2.transform(myTransformer)
                            .filter(myFilter, "filter")
                            .route(messageRouter)
                            .handle(inlineHandler, "handle")
                    ).errorHandler(new MessagePublishingErrorHandler())
            );
}

上述 5.1 版配置的日志中出现异常。5.RELEASE:

Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. at org.springframework.integration.dsl.IntegrationFlowDefinition.registerOutputChannelIfCan(IntegrationFlowDefinition.java:3088) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.register(IntegrationFlowDefinition.java:3033) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1185) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1001) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:979) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null(IntegrationConfig.java:42) ~[classes/:na] at org.springframework.integration.dsl.PublishSubscribeSpec.subscribe(PublishSubscribeSpec.java:58) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null(IntegrationConfig.java:38) ~[classes/:na] at org.springframework.integration.dsl.IntegrationFlowDefinition.publishSubscribeChannel(IntegrationFlowDefinition.java:255) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$mainFlow(IntegrationConfig.java:36) ~[classes/:na] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.processIntegrationFlowImpl(IntegrationFlowBeanPostProcessor.java:309) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.postProcessBeforeInitialization(IntegrationFlowBeanPostProcessor.java:117) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:414) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]

更新了适用于 5.1 的配置。5.RELEASE:

inlineHandler bean 中发生的处理现在发生在它自己的 channel/flow 中,而不是内联。我不得不更改 messageRouter bean 将消息路由到早些时候内联发生的默认流。

@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
                                StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
                                AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
    return flow -> flow.channel(inboundChannel)
            .channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
            .publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
                    flow1.handle(statusUpdater, "handle"))
                    .subscribe(flow2 -> flow2.transform(myTransformer)
                            .filter(myFilter, "filter")
                            .route(messageRouter)
                            //.handle(inlineHandler, "handle") // removed this
                    ).errorHandler(new MessagePublishingErrorHandler())
            );
}


@Bean
public AbstractMessageRouter messageRouter(@Qualifier("swift") MessageChannel messageChannel, MessageChannel defaultFlow) {
    return new AbstractMessageRouter() {
        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            System.out.println(">>>>>> Determining channel");
            if(//some cond) {
                 return Collections.singleton(defaultFlow);
              }
            return Collections.singleton(messageChannel);
        }
    };
}

I am wondering why this has been changed

好吧,让我们再看看错误信息:

The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.

现在让我们来看看路由器并了解它的不同之处,例如来自服务激活器:

https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#router-implementations

https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#service-activator

如您所见,路由器有一个 mapping 来确定将消息发送到的进一步通道。在您的旧配置中:

 .route(messageRouter)
 .handle(inlineHandler, "handle")

您刚刚为 default channel 提供了提供此方法链的无条件映射。然而,在那个旧版本中,我们忽略了一个事实,即回退到默认输出并不是默认的(参见 resolutionRequired = true 选项)。因此,我们不能假设 end-users 总是希望回退到主流。更重要的是,它给人的印象是你总是可以在流程的中间配置路由器。从版本 5.0 开始,我们将 IntegrationFlowDefinition 与路由器性质保持一致,并引入了一个选项来明确假设:

/**
 * Make a default output mapping of the router to the parent flow.
 * Use the next, after router, parent flow {@link MessageChannel} as a
 * {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
 * @return the router spec.
 */
public S defaultOutputToParentFlow() {

并且有一个关于此事的文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl-routers