Spring 集成 JMS 消费者未使用所有消息
Spring Integration JMS Consumers not consuming all messages
设置
我有 Spring 名为 Dispatcher 的启动应用程序。它在 1 台机器上运行并具有嵌入式 ActiveMQ Broker:
@Bean
public BrokerService broker(ActiveMQProperties properties) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector(properties.getBrokerUrl());
return broker;
}
将任务写入 JMS 队列:
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(taskQueue())
.bridge(Bridges.blockingPoller(outboundTaskScheduler()))
.transform(outboundTransformer)
.handle(Jms.outboundAdapter(connectionFactory)
.extractPayload(false)
.destination(JmsQueueNames.STANDARD_TASKS))
.get();
}
@Bean
public QueueChannel standardTaskQueue() {
return MessageChannels.priority()
.comparator(TASK_PRIO_COMPARATOR)
.get();
}
// 2 more queues with different names but same config
Worker 应用程序在 10 台机器上运行,每台机器有 20 个内核,配置如下:
@Bean
public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
int maxWorkers = 20;
return IntegrationFlows
.from(Jms.channel(connectionFactory)
.sessionTransacted(true)
.concurrentConsumers(maxWorkers)
.taskExecutor(
Executors.newFixedThreadPool(maxWorkers, new CustomizableThreadFactory("standard-"))
)
.destination(JmsQueueNames.STANDARD_TASKS))
.channel(ChannelNames.TASKS_INBOUND)
.get();
}
// 2 more inbound queues with different names but same config
第二个队列重复此操作,外加 1 个特殊情况。所以 共有 401 个消费者。
观察
使用JConsole,我可以看到ActiveMQ队列中有任务:
[TODO 插入截图]
不出所料,在任何 Worker 机器上,都有 20 个消费者线程:
[TODO 插入截图]
但大多数(如果不是全部)空闲,即使队列中仍有消息。使用我们的监控工具,我看到在任何给定时间都在处理大约 50 到 400 个任务,而期望值是恒定的 400。
我还观察到 Spring 为每个消费者创建 AbstractPollingMessageListenerContainer
,这似乎导致每个应用程序每个队列每秒打开 1 个 JMS 连接(每秒 33 个连接)。
调查
所以我发现 I do not receive messages in my second consumer 暗示 prefetch
是罪魁祸首。这听起来很有道理,所以我在每个 worker 上配置了 tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1
。然而,在任何时候都只处理了大约 25 个任务,这对我来说完全没有意义。
问题
我似乎不明白发生了什么,而且由于我 运行 没有时间进行调查,我希望任何人都可以为我指出正确的方向。哪些因素可能是原因? consumers/connections的数量?预取?还有什么吗?
原来是预取策略导致的。在我的例子中正确的配置是使用 tcp://dispatcher:61616?jms.prefetchPolicy.all=0
在我之前的(失败的)测试中我使用了 jms.prefetchPolicy.queuePrefetch=1
但事后我不确定我是否在正确的地方配置它。
设置
我有 Spring 名为 Dispatcher 的启动应用程序。它在 1 台机器上运行并具有嵌入式 ActiveMQ Broker:
@Bean
public BrokerService broker(ActiveMQProperties properties) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector(properties.getBrokerUrl());
return broker;
}
将任务写入 JMS 队列:
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(taskQueue())
.bridge(Bridges.blockingPoller(outboundTaskScheduler()))
.transform(outboundTransformer)
.handle(Jms.outboundAdapter(connectionFactory)
.extractPayload(false)
.destination(JmsQueueNames.STANDARD_TASKS))
.get();
}
@Bean
public QueueChannel standardTaskQueue() {
return MessageChannels.priority()
.comparator(TASK_PRIO_COMPARATOR)
.get();
}
// 2 more queues with different names but same config
Worker 应用程序在 10 台机器上运行,每台机器有 20 个内核,配置如下:
@Bean
public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
int maxWorkers = 20;
return IntegrationFlows
.from(Jms.channel(connectionFactory)
.sessionTransacted(true)
.concurrentConsumers(maxWorkers)
.taskExecutor(
Executors.newFixedThreadPool(maxWorkers, new CustomizableThreadFactory("standard-"))
)
.destination(JmsQueueNames.STANDARD_TASKS))
.channel(ChannelNames.TASKS_INBOUND)
.get();
}
// 2 more inbound queues with different names but same config
第二个队列重复此操作,外加 1 个特殊情况。所以 共有 401 个消费者。
观察
使用JConsole,我可以看到ActiveMQ队列中有任务:
[TODO 插入截图]
不出所料,在任何 Worker 机器上,都有 20 个消费者线程:
[TODO 插入截图]
但大多数(如果不是全部)空闲,即使队列中仍有消息。使用我们的监控工具,我看到在任何给定时间都在处理大约 50 到 400 个任务,而期望值是恒定的 400。
我还观察到 Spring 为每个消费者创建 AbstractPollingMessageListenerContainer
,这似乎导致每个应用程序每个队列每秒打开 1 个 JMS 连接(每秒 33 个连接)。
调查
所以我发现 I do not receive messages in my second consumer 暗示 prefetch
是罪魁祸首。这听起来很有道理,所以我在每个 worker 上配置了 tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1
。然而,在任何时候都只处理了大约 25 个任务,这对我来说完全没有意义。
问题
我似乎不明白发生了什么,而且由于我 运行 没有时间进行调查,我希望任何人都可以为我指出正确的方向。哪些因素可能是原因? consumers/connections的数量?预取?还有什么吗?
原来是预取策略导致的。在我的例子中正确的配置是使用 tcp://dispatcher:61616?jms.prefetchPolicy.all=0
在我之前的(失败的)测试中我使用了 jms.prefetchPolicy.queuePrefetch=1
但事后我不确定我是否在正确的地方配置它。