Spring 异常后未发出集成消息
Spring integration message not emitted after exception
我尝试处理 spring 集成流程中发生的异常。
流程如下:
来源 -> 拆分 -> 句柄 -> 句柄
我的来源提供了一个对象列表作为有效负载。
拆分一次发出一个元素。
第一个处理程序遇到异常。
我期望由于异常被发布到下面示例中配置的错误通道,列表中的其他元素将继续被发出。
但是在第一个异常之后流程停止了!!!
是否有我缺少的配置?
@Bean
public IntegrationFlow pubSubFlow(PublishSubscribeChannel publishSubscribeChannel,
@Qualifier("myMessagePublishingErrorHandler") MessagePublishingErrorHandler messagePublishingErrorHandler) {
return flow -> flow
.channel(publishSubscribeChannel)
.publishSubscribeChannel(config -> config
.subscribe(f1 -> f1
.split()
.handle("action", "act")
.handle(m1 -> System.out.println(">>>" + m1)))
.subscribe(f1 -> f1
.split()
.handle(m1 -> System.out.println("<<<" + m1)))
.errorHandler(messagePublishingErrorHandler));
}
错误处理程序:
@Bean
public MessagePublishingErrorHandler myMessagePublishingErrorHandler(@Qualifier("appErrorChannel") DirectChannel directChannel) {
MessagePublishingErrorHandler messagePublishingErrorHandler = new MessagePublishingErrorHandler();
messagePublishingErrorHandler.setDefaultErrorChannel(directChannel);
return messagePublishingErrorHandler;
}
@Bean
public DirectChannel appErrorChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow errorFlow(@Qualifier("appErrorChannel") DirectChannel directChannel) {
return IntegrationFlows.from(directChannel).handle(System.out::println).get();
}
我稍微重构了您的 pubSubFlow
代码以修复错误并提高可读性。请考虑在未来尊重我们帮助您使问题尽可能干净的努力。
所以,您的代码没有反映描述。您在每个 pub-sub 分支中都有 .split()
,而不是您可能希望的在主流中。
您在 publishSubscribeChannel
上有一个 messagePublishingErrorHandler
,因此,确实在那里处理了任何下游错误,但这是针对发送到 sub-flow 的消息完成的。由于拆分器是 sub-flow 的一部分(对于一条传入消息),它肯定会停止工作,因为它刚刚冒出一个错误。
请重新考虑您想要从流程中得到什么。而如果你只需要拆分一次然后pub-sub,那么移动.split()
到publishSubscribeChannel()
之前。
但是请注意,如果 messagePublishingErrorHandler
配置了 Executor
,messagePublishingErrorHandler
将在 PublishSubscribeChannel
上运行。
无论如何总是有办法在拆分器之后放置一个 ExecutorChannel
以并行处理项目并且不会因错误影响主拆分循环。
我尝试处理 spring 集成流程中发生的异常。
流程如下:
来源 -> 拆分 -> 句柄 -> 句柄
我的来源提供了一个对象列表作为有效负载。 拆分一次发出一个元素。
第一个处理程序遇到异常。 我期望由于异常被发布到下面示例中配置的错误通道,列表中的其他元素将继续被发出。 但是在第一个异常之后流程停止了!!! 是否有我缺少的配置?
@Bean
public IntegrationFlow pubSubFlow(PublishSubscribeChannel publishSubscribeChannel,
@Qualifier("myMessagePublishingErrorHandler") MessagePublishingErrorHandler messagePublishingErrorHandler) {
return flow -> flow
.channel(publishSubscribeChannel)
.publishSubscribeChannel(config -> config
.subscribe(f1 -> f1
.split()
.handle("action", "act")
.handle(m1 -> System.out.println(">>>" + m1)))
.subscribe(f1 -> f1
.split()
.handle(m1 -> System.out.println("<<<" + m1)))
.errorHandler(messagePublishingErrorHandler));
}
错误处理程序:
@Bean
public MessagePublishingErrorHandler myMessagePublishingErrorHandler(@Qualifier("appErrorChannel") DirectChannel directChannel) {
MessagePublishingErrorHandler messagePublishingErrorHandler = new MessagePublishingErrorHandler();
messagePublishingErrorHandler.setDefaultErrorChannel(directChannel);
return messagePublishingErrorHandler;
}
@Bean
public DirectChannel appErrorChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow errorFlow(@Qualifier("appErrorChannel") DirectChannel directChannel) {
return IntegrationFlows.from(directChannel).handle(System.out::println).get();
}
我稍微重构了您的 pubSubFlow
代码以修复错误并提高可读性。请考虑在未来尊重我们帮助您使问题尽可能干净的努力。
所以,您的代码没有反映描述。您在每个 pub-sub 分支中都有 .split()
,而不是您可能希望的在主流中。
您在 publishSubscribeChannel
上有一个 messagePublishingErrorHandler
,因此,确实在那里处理了任何下游错误,但这是针对发送到 sub-flow 的消息完成的。由于拆分器是 sub-flow 的一部分(对于一条传入消息),它肯定会停止工作,因为它刚刚冒出一个错误。
请重新考虑您想要从流程中得到什么。而如果你只需要拆分一次然后pub-sub,那么移动.split()
到publishSubscribeChannel()
之前。
但是请注意,如果 messagePublishingErrorHandler
配置了 Executor
,messagePublishingErrorHandler
将在 PublishSubscribeChannel
上运行。
无论如何总是有办法在拆分器之后放置一个 ExecutorChannel
以并行处理项目并且不会因错误影响主拆分循环。