Spring 集成多个队列消费者
Spring Integration multiple queue consumers
作为 的结果,我有一个使用自定义 TaskScheduler 立即使用队列中消息的轮询器:
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
IntegrationFlows
.from("inbound")
.channel(MessageChannels.priority().get())
.bridge(bridge -> bridge
.taskScheduler(taskScheduler)
.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
现在我想有多个线程并发消费,所以我尝试了:
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
scheduler.setPoolSize(4);
但是,由于在 AbstractPollingEndpoint
中任务调度器调度了一个同步轮询器(有点复杂),因此只创建了 1 个线程。如果我将 TaskExecutor 设置为除 SyncTaskExecutor
(默认)以外的任何值,我会 运行 进入大量计划任务(请参阅 )。
如何在 Spring 集成中同时从一个队列消费?这看起来很基本,但我找不到解决方案。
我可以使用 ExecutorChannel
而不是队列,但是,(AFAIK) 然后我失去了队列功能,如优先级、队列大小和我依赖的指标。
参见PollerSpec.taskExecutor()
:
/**
* Specify an {@link Executor} to perform the {@code pollingTask}.
* @param taskExecutor the {@link Executor} to use.
* @return the spec.
*/
public PollerSpec taskExecutor(Executor taskExecutor) {
这种方式在根据您的 taskScheduler
和 delay
定期安排任务后,真正的任务是在提供的执行程序的线程上执行的。默认情况下,它确实在调度程序的线程上执行任务。
更新
我不确定这是否满足您的要求,但这是保持队列逻辑和并行处理任何拉入的唯一方法:
.bridge(bridge -> bridge
.taskScheduler(taskScheduler)
.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
.channel(channels -> channel.executor(threadPoolExecutor()))
.fixedSubscriberChannel()
我是这样解决的:
- 执行轮询的single-threaded任务调度程序
- 带有同步队列的线程池执行器
这样,任务调度程序可以为每个执行程序分配 1 个任务,并在没有执行程序空闲时阻塞,从而不会耗尽源队列或垃圾任务。
@Bean
public IntegrationFlow extractTaskResultFlow() {
return IntegrationFlows
.from(ChannelNames.TASK_RESULT_QUEUE)
.bridge(bridge -> bridge
.taskScheduler(taskResultTaskScheduler())
.poller(Pollers
.fixedDelay(0)
.taskExecutor(taskResultExecutor())
.receiveTimeout(Long.MAX_VALUE)))
.handle(resultProcessor)
.channel(ChannelNames.TASK_FINALIZER_CHANNEL)
.get();
}
@Bean
public TaskExecutor taskResultExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, // corePoolSize
8, // maximumPoolSize
1L, // keepAliveTime
TimeUnit.MINUTES,
new SynchronousQueue<>(),
new CustomizableThreadFactory("resultProcessor-")
);
executor.setRejectedExecutionHandler(new CallerBlocksPolicy(Long.MAX_VALUE));
return new ErrorHandlingTaskExecutor(executor, errorHandler);
}
@Bean
public TaskScheduler taskResultTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultPoller-");
return scheduler;
}
(最初的示例是从链接问题中复制的,现在这个类似于我的实际解决方案)
作为
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
IntegrationFlows
.from("inbound")
.channel(MessageChannels.priority().get())
.bridge(bridge -> bridge
.taskScheduler(taskScheduler)
.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
现在我想有多个线程并发消费,所以我尝试了:
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultProcessor-");
scheduler.setPoolSize(4);
但是,由于在 AbstractPollingEndpoint
中任务调度器调度了一个同步轮询器(有点复杂),因此只创建了 1 个线程。如果我将 TaskExecutor 设置为除 SyncTaskExecutor
(默认)以外的任何值,我会 运行 进入大量计划任务(请参阅
如何在 Spring 集成中同时从一个队列消费?这看起来很基本,但我找不到解决方案。
我可以使用 ExecutorChannel
而不是队列,但是,(AFAIK) 然后我失去了队列功能,如优先级、队列大小和我依赖的指标。
参见PollerSpec.taskExecutor()
:
/**
* Specify an {@link Executor} to perform the {@code pollingTask}.
* @param taskExecutor the {@link Executor} to use.
* @return the spec.
*/
public PollerSpec taskExecutor(Executor taskExecutor) {
这种方式在根据您的 taskScheduler
和 delay
定期安排任务后,真正的任务是在提供的执行程序的线程上执行的。默认情况下,它确实在调度程序的线程上执行任务。
更新
我不确定这是否满足您的要求,但这是保持队列逻辑和并行处理任何拉入的唯一方法:
.bridge(bridge -> bridge
.taskScheduler(taskScheduler)
.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
.channel(channels -> channel.executor(threadPoolExecutor()))
.fixedSubscriberChannel()
我是这样解决的:
- 执行轮询的single-threaded任务调度程序
- 带有同步队列的线程池执行器
这样,任务调度程序可以为每个执行程序分配 1 个任务,并在没有执行程序空闲时阻塞,从而不会耗尽源队列或垃圾任务。
@Bean
public IntegrationFlow extractTaskResultFlow() {
return IntegrationFlows
.from(ChannelNames.TASK_RESULT_QUEUE)
.bridge(bridge -> bridge
.taskScheduler(taskResultTaskScheduler())
.poller(Pollers
.fixedDelay(0)
.taskExecutor(taskResultExecutor())
.receiveTimeout(Long.MAX_VALUE)))
.handle(resultProcessor)
.channel(ChannelNames.TASK_FINALIZER_CHANNEL)
.get();
}
@Bean
public TaskExecutor taskResultExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, // corePoolSize
8, // maximumPoolSize
1L, // keepAliveTime
TimeUnit.MINUTES,
new SynchronousQueue<>(),
new CustomizableThreadFactory("resultProcessor-")
);
executor.setRejectedExecutionHandler(new CallerBlocksPolicy(Long.MAX_VALUE));
return new ErrorHandlingTaskExecutor(executor, errorHandler);
}
@Bean
public TaskScheduler taskResultTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("resultPoller-");
return scheduler;
}
(最初的示例是从链接问题中复制的,现在这个类似于我的实际解决方案)