Spring 集成路由器行为从 4.3.10.RELEASE 到 5.1.5.RELEASE
Spring integration router behavior from 4.3.10.RELEASE to 5.1.5.RELEASE
我有一个基于 Spring 集成的应用程序,该应用程序的一部分具有以下配置。
它在 4.3 版本中运行良好。10.RELEASE。但是现在我正在尝试将版本升级到 5.1。5.RELEASE 并且应用程序无法启动,出现下面提到的异常。
我能够通过更改配置来解决问题,但我想知道为什么会发生这种变化
在 4.3 中工作的旧配置。10.RELEASE:
@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
return flow -> flow.channel(inboundChannel)
.channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
.publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
flow1.handle(statusUpdater, "handle"))
.subscribe(flow2 -> flow2.transform(myTransformer)
.filter(myFilter, "filter")
.route(messageRouter)
.handle(inlineHandler, "handle")
).errorHandler(new MessagePublishingErrorHandler())
);
}
上述 5.1 版配置的日志中出现异常。5.RELEASE:
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
at org.springframework.integration.dsl.IntegrationFlowDefinition.registerOutputChannelIfCan(IntegrationFlowDefinition.java:3088) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.register(IntegrationFlowDefinition.java:3033) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1185) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1001) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:979) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at com.config.IntegrationConfig.lambda$null(IntegrationConfig.java:42) ~[classes/:na]
at org.springframework.integration.dsl.PublishSubscribeSpec.subscribe(PublishSubscribeSpec.java:58) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at com.config.IntegrationConfig.lambda$null(IntegrationConfig.java:38) ~[classes/:na]
at org.springframework.integration.dsl.IntegrationFlowDefinition.publishSubscribeChannel(IntegrationFlowDefinition.java:255) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at com.config.IntegrationConfig.lambda$mainFlow(IntegrationConfig.java:36) ~[classes/:na]
at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.processIntegrationFlowImpl(IntegrationFlowBeanPostProcessor.java:309) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.postProcessBeforeInitialization(IntegrationFlowBeanPostProcessor.java:117) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:414) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
更新了适用于 5.1 的配置。5.RELEASE:
inlineHandler
bean 中发生的处理现在发生在它自己的 channel/flow 中,而不是内联。我不得不更改 messageRouter
bean 将消息路由到早些时候内联发生的默认流。
@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
return flow -> flow.channel(inboundChannel)
.channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
.publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
flow1.handle(statusUpdater, "handle"))
.subscribe(flow2 -> flow2.transform(myTransformer)
.filter(myFilter, "filter")
.route(messageRouter)
//.handle(inlineHandler, "handle") // removed this
).errorHandler(new MessagePublishingErrorHandler())
);
}
@Bean
public AbstractMessageRouter messageRouter(@Qualifier("swift") MessageChannel messageChannel, MessageChannel defaultFlow) {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
System.out.println(">>>>>> Determining channel");
if(//some cond) {
return Collections.singleton(defaultFlow);
}
return Collections.singleton(messageChannel);
}
};
}
I am wondering why this has been changed
好吧,让我们再看看错误信息:
The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
现在让我们来看看路由器并了解它的不同之处,例如来自服务激活器:
如您所见,路由器有一个 mapping
来确定将消息发送到的进一步通道。在您的旧配置中:
.route(messageRouter)
.handle(inlineHandler, "handle")
您刚刚为 default channel
提供了提供此方法链的无条件映射。然而,在那个旧版本中,我们忽略了一个事实,即回退到默认输出并不是默认的(参见 resolutionRequired = true
选项)。因此,我们不能假设 end-users 总是希望回退到主流。更重要的是,它给人的印象是你总是可以在流程的中间配置路由器。从版本 5.0
开始,我们将 IntegrationFlowDefinition
与路由器性质保持一致,并引入了一个选项来明确假设:
/**
* Make a default output mapping of the router to the parent flow.
* Use the next, after router, parent flow {@link MessageChannel} as a
* {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
* @return the router spec.
*/
public S defaultOutputToParentFlow() {
并且有一个关于此事的文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl-routers
我有一个基于 Spring 集成的应用程序,该应用程序的一部分具有以下配置。
它在 4.3 版本中运行良好。10.RELEASE。但是现在我正在尝试将版本升级到 5.1。5.RELEASE 并且应用程序无法启动,出现下面提到的异常。
我能够通过更改配置来解决问题,但我想知道为什么会发生这种变化
在 4.3 中工作的旧配置。10.RELEASE:
@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
return flow -> flow.channel(inboundChannel)
.channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
.publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
flow1.handle(statusUpdater, "handle"))
.subscribe(flow2 -> flow2.transform(myTransformer)
.filter(myFilter, "filter")
.route(messageRouter)
.handle(inlineHandler, "handle")
).errorHandler(new MessagePublishingErrorHandler())
);
}
上述 5.1 版配置的日志中出现异常。5.RELEASE:
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. at org.springframework.integration.dsl.IntegrationFlowDefinition.registerOutputChannelIfCan(IntegrationFlowDefinition.java:3088) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.register(IntegrationFlowDefinition.java:3033) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1185) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1001) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:979) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null(IntegrationConfig.java:42) ~[classes/:na] at org.springframework.integration.dsl.PublishSubscribeSpec.subscribe(PublishSubscribeSpec.java:58) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null(IntegrationConfig.java:38) ~[classes/:na] at org.springframework.integration.dsl.IntegrationFlowDefinition.publishSubscribeChannel(IntegrationFlowDefinition.java:255) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$mainFlow(IntegrationConfig.java:36) ~[classes/:na] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.processIntegrationFlowImpl(IntegrationFlowBeanPostProcessor.java:309) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.postProcessBeforeInitialization(IntegrationFlowBeanPostProcessor.java:117) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:414) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
更新了适用于 5.1 的配置。5.RELEASE:
inlineHandler
bean 中发生的处理现在发生在它自己的 channel/flow 中,而不是内联。我不得不更改 messageRouter
bean 将消息路由到早些时候内联发生的默认流。
@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
return flow -> flow.channel(inboundChannel)
.channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
.publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
flow1.handle(statusUpdater, "handle"))
.subscribe(flow2 -> flow2.transform(myTransformer)
.filter(myFilter, "filter")
.route(messageRouter)
//.handle(inlineHandler, "handle") // removed this
).errorHandler(new MessagePublishingErrorHandler())
);
}
@Bean
public AbstractMessageRouter messageRouter(@Qualifier("swift") MessageChannel messageChannel, MessageChannel defaultFlow) {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
System.out.println(">>>>>> Determining channel");
if(//some cond) {
return Collections.singleton(defaultFlow);
}
return Collections.singleton(messageChannel);
}
};
}
I am wondering why this has been changed
好吧,让我们再看看错误信息:
The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
现在让我们来看看路由器并了解它的不同之处,例如来自服务激活器:
如您所见,路由器有一个 mapping
来确定将消息发送到的进一步通道。在您的旧配置中:
.route(messageRouter)
.handle(inlineHandler, "handle")
您刚刚为 default channel
提供了提供此方法链的无条件映射。然而,在那个旧版本中,我们忽略了一个事实,即回退到默认输出并不是默认的(参见 resolutionRequired = true
选项)。因此,我们不能假设 end-users 总是希望回退到主流。更重要的是,它给人的印象是你总是可以在流程的中间配置路由器。从版本 5.0
开始,我们将 IntegrationFlowDefinition
与路由器性质保持一致,并引入了一个选项来明确假设:
/**
* Make a default output mapping of the router to the parent flow.
* Use the next, after router, parent flow {@link MessageChannel} as a
* {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
* @return the router spec.
*/
public S defaultOutputToParentFlow() {
并且有一个关于此事的文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl-routers