SimpleMessageListenerContainer 批量消息处理
SimpleMessageListenerContainer bulk message processing
我有一个传入数据流,作为单独的消息发送到 RabbitMQ。
我想将这些发送到需要一批消息的服务。当我有一批 1000 条消息或 5 秒过期时,我需要向服务发送请求。这可能使用 SimpleMessageListenerContainer 吗?
SimpleMessageListenerContainer 支持事务,但这对 5 秒超时没有帮助。我确实查看了方法 doReceiveAndExecute(BlockingQueueConsumer consumer) 和 "receiveTimeout",但由于此变量在事务循环内,我最终可能会等待每条消息 5 秒(1000*5 秒= 83 分钟)。
我目前有一个通道感知侦听器,它将消息分批放入一个批量处理器中,该处理器将管理我的超时和队列长度。 SimpleMessageListenerContainer 设置为手动确认。但是,作为在消息实际发送到服务之前的侦听器 returns,当我在通道关闭时确认消息时偶尔会遇到问题。
我考虑过编写自己的 ListenerContainer,将整个 BlockingQueueConsumer 发送到 Listener。这是唯一的解决方案还是已经有人成功地做了类似的事情?
可以使用ChannelAwareMessageListener,
集acknowledgeMode=MANUAL
;在听众中累积交付;启动一个计时器(计划任务)以在 +5 秒内执行并保留对通道的引用。当新的快递到达时,取消任务,将新的快递添加到集合中。
当 1000 个交付到达时(或计划任务触发);调用您的服务;然后使用 channel.basicAck()
(多个)确认已处理的消息。
您需要处理一些竞争条件,但这应该很容易。也许另一个批次队列最简单,因为一些其他线程正在等待批次到达该队列。
编辑
从 2.2 开始,SimpleMessageListenerContainer
支持本地发送批量消息 - 请参阅 Batched Messages。
Starting with version 2.2, the SimpleMessageListeneContainer
can be use to create batches on the consumer side (where the producer sent discrete messages).
Set the container property consumerBatchEnabled
to enable this feature. deBatchingEnabled
must also be true so that the container is responsible for processing batches of both types. Implement BatchMessageListener
or ChannelAwareBatchMessageListener
when consumerBatchEnabled is true. See @RabbitListener with Batching
for information about using this feature with @RabbitListener
.
我有一个传入数据流,作为单独的消息发送到 RabbitMQ。
我想将这些发送到需要一批消息的服务。当我有一批 1000 条消息或 5 秒过期时,我需要向服务发送请求。这可能使用 SimpleMessageListenerContainer 吗?
SimpleMessageListenerContainer 支持事务,但这对 5 秒超时没有帮助。我确实查看了方法 doReceiveAndExecute(BlockingQueueConsumer consumer) 和 "receiveTimeout",但由于此变量在事务循环内,我最终可能会等待每条消息 5 秒(1000*5 秒= 83 分钟)。
我目前有一个通道感知侦听器,它将消息分批放入一个批量处理器中,该处理器将管理我的超时和队列长度。 SimpleMessageListenerContainer 设置为手动确认。但是,作为在消息实际发送到服务之前的侦听器 returns,当我在通道关闭时确认消息时偶尔会遇到问题。
我考虑过编写自己的 ListenerContainer,将整个 BlockingQueueConsumer 发送到 Listener。这是唯一的解决方案还是已经有人成功地做了类似的事情?
可以使用ChannelAwareMessageListener,
集acknowledgeMode=MANUAL
;在听众中累积交付;启动一个计时器(计划任务)以在 +5 秒内执行并保留对通道的引用。当新的快递到达时,取消任务,将新的快递添加到集合中。
当 1000 个交付到达时(或计划任务触发);调用您的服务;然后使用 channel.basicAck()
(多个)确认已处理的消息。
您需要处理一些竞争条件,但这应该很容易。也许另一个批次队列最简单,因为一些其他线程正在等待批次到达该队列。
编辑
从 2.2 开始,SimpleMessageListenerContainer
支持本地发送批量消息 - 请参阅 Batched Messages。
Starting with version 2.2, the
SimpleMessageListeneContainer
can be use to create batches on the consumer side (where the producer sent discrete messages).Set the container property
consumerBatchEnabled
to enable this feature.deBatchingEnabled
must also be true so that the container is responsible for processing batches of both types. ImplementBatchMessageListener
orChannelAwareBatchMessageListener
when consumerBatchEnabled is true. See@RabbitListener with Batching
for information about using this feature with@RabbitListener
.