Spring 集成 - 分散-聚集
Spring Integration - Scatter-Gather
我正在使用 Spring 集成和 Scatter Gather 处理程序 (https://docs.spring.io/spring-integration/docs/5.3.0.M1/reference/html/scatter-gather.html) 以向外部 REST API 发送 3 个并行请求(使用 ExecutorChannels)并将它们的响应聚合到一条消息中。
在 Aggregator 的 aggregatePayloads 方法 (AggregatingMessageHandler) 中抛出异常之前一切正常。在这种情况下,错误消息已成功传送到启动流程的 Messaging Gateway(调用方)。但是,ScatterGatherHandler 线程保持挂起状态,等待收集器回复(我相信),由于其中的异常,该回复永远不会到达。即每个顺序调用都会留下一个额外的线程处于“卡住”状态,最终线程池会耗尽可用的工作线程。
我当前的 Scatter Gather 配置:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setChannels(Arrays.asList(Channel1(asyncExecutor()),Channel2(asyncExecutor()),Channel3(asyncExecutor())));
return router;
}
@Bean
public MessageHandler gatherer() {
AggregatingMessageHandler aggregatingMessageHandler = new AggregatingMessageHandler(
new TransactionAggregator(),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy("correlationID"),
new ExpressionEvaluatingReleaseStrategy("size() == 3"));
aggregatingMessageHandler.setExpireGroupsUponCompletion( true );
return aggregatingMessageHandler;
}
@Bean
@ServiceActivator(inputChannel = "validationOutputChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setErrorChannelName("scatterGatherErrorChannel");
return handler;
}
@Bean("taskExecutor")
@Primary
public TaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
到目前为止,我找到的唯一解决方案是为 ScatterGatherHandler 添加 RequiresReply 和 GatherTimeout 值,如下所示:
handler.setGatherTimeout(120000L);
handler.setRequiresReply(true);
这将产生一个异常,并在指定的超时值后以及在聚合器的异常被传递到消息传递网关后将 ScatterGatherHandler 的线程释放到拉取。我可以在日志中看到以下消息:
[AsyncThread-1] [WARN] [o.s.m.c.GenericMessagingTemplate$TemporaryReplyChannel:] [{}] - Reply message received but the receiving thread has already received a reply: ErrorMessage
还有其他方法可以实现吗?我的主要目标是确保在聚合器的 aggregatePayloads 方法中抛出异常时我不会阻塞任何线程。
谢谢。
从技术上讲,这确实是一种预期行为。请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#scatter-gather-error-handling
In this case a reasonable, finite gatherTimeout
must be configured for the ScatterGatherHandler
. Otherwise it is going to be blocked waiting for a reply from the gatherer forever, by default.
确实没有办法打破 ScatterGatherHandler
代码对 BlockingQueue.take()
的期望。
我正在使用 Spring 集成和 Scatter Gather 处理程序 (https://docs.spring.io/spring-integration/docs/5.3.0.M1/reference/html/scatter-gather.html) 以向外部 REST API 发送 3 个并行请求(使用 ExecutorChannels)并将它们的响应聚合到一条消息中。
在 Aggregator 的 aggregatePayloads 方法 (AggregatingMessageHandler) 中抛出异常之前一切正常。在这种情况下,错误消息已成功传送到启动流程的 Messaging Gateway(调用方)。但是,ScatterGatherHandler 线程保持挂起状态,等待收集器回复(我相信),由于其中的异常,该回复永远不会到达。即每个顺序调用都会留下一个额外的线程处于“卡住”状态,最终线程池会耗尽可用的工作线程。
我当前的 Scatter Gather 配置:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setChannels(Arrays.asList(Channel1(asyncExecutor()),Channel2(asyncExecutor()),Channel3(asyncExecutor())));
return router;
}
@Bean
public MessageHandler gatherer() {
AggregatingMessageHandler aggregatingMessageHandler = new AggregatingMessageHandler(
new TransactionAggregator(),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy("correlationID"),
new ExpressionEvaluatingReleaseStrategy("size() == 3"));
aggregatingMessageHandler.setExpireGroupsUponCompletion( true );
return aggregatingMessageHandler;
}
@Bean
@ServiceActivator(inputChannel = "validationOutputChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setErrorChannelName("scatterGatherErrorChannel");
return handler;
}
@Bean("taskExecutor")
@Primary
public TaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
到目前为止,我找到的唯一解决方案是为 ScatterGatherHandler 添加 RequiresReply 和 GatherTimeout 值,如下所示:
handler.setGatherTimeout(120000L);
handler.setRequiresReply(true);
这将产生一个异常,并在指定的超时值后以及在聚合器的异常被传递到消息传递网关后将 ScatterGatherHandler 的线程释放到拉取。我可以在日志中看到以下消息:
[AsyncThread-1] [WARN] [o.s.m.c.GenericMessagingTemplate$TemporaryReplyChannel:] [{}] - Reply message received but the receiving thread has already received a reply: ErrorMessage
还有其他方法可以实现吗?我的主要目标是确保在聚合器的 aggregatePayloads 方法中抛出异常时我不会阻塞任何线程。
谢谢。
从技术上讲,这确实是一种预期行为。请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#scatter-gather-error-handling
In this case a reasonable, finite
gatherTimeout
must be configured for theScatterGatherHandler
. Otherwise it is going to be blocked waiting for a reply from the gatherer forever, by default.
确实没有办法打破 ScatterGatherHandler
代码对 BlockingQueue.take()
的期望。