AMQP 入站通道适配器并发消费者属性与任务执行器之间的关系
Relationship between AMQP inbound channel adapter concurrent-consumers attribute and task executors
我正在使用
Spring 集成 4.1.2.RELEASE
Spring AMQP 1.4.3.RELEASE
我有一个 AMQP 入站通道适配器和一个 ThreadPoolTaskExecutor 配置如下:
<task:executor id="exec.newItems" pool-size="5" />
<int-amqp:inbound-channel-adapter
connection-factory="amqpConnectionFactory" auto-startup="true"
queue-names="#{newItemsQueueName}"
channel="newItems.payloadType.routingChannel"
message-converter="jsonMessageConverter"
acknowledge-mode="AUTO" error-channel="errorChannel"
concurrent-consumers="5"
mapped-request-headers="*"
channel-transacted="false"
task-executor="exec.newItems"
/>
我想知道在 AMQP 入站通道适配器中为并发消费者设置的值与任务执行器配置中的池大小之间的关系。
这是我在 Eclipse 中使用 JVM Monitor 插件观察到的结果。
如果concurrent-consumers大于pool-size并且pool-size为x,
然后创建了 x 个线程,但它们处于阻塞状态,并且
消息未被处理。
如果concurrent-consumers等于pool-size并且pool-size是x,
然后创建 x 个线程并处理消息。
如果并发消费者小于池大小并且
concurrent-consumers 为 y,则创建 y 个线程并发送消息
已处理。
我认为并发消费者可能正在执行程序上设置最大池大小。这是一个准确的观察吗?
我会说你的调查是正确的。
消费者在他们的 Runnable.run
实现中是 while(true)
的长期任务,因此他们每个人都永远从那个执行者那里获得一个线程(当然直到它死亡)。
默认Executor
是SimpleMessageListenerContainer
中的SimpleAsyncTaskExecutor
。这意味着任何新的消费者都有自己的线程并且不会导致阻塞问题。
在 ThreadPoolTaskExecutor
中,当池中没有足够的线程供我们使用时,我们确实会遇到一个问题。我们的一些消费者不会做他们的工作。当我们在不同的组件之间共享 taskExecutor
时,它可能越糟糕。
在应用程序中使用managed
执行器是个不错的主意(尤其是在AS环境中),即使是对于这种长期存在的任务,但我们确实应该确保我们拥有最佳配置用于并发。
我正在使用
Spring 集成 4.1.2.RELEASE
Spring AMQP 1.4.3.RELEASE
我有一个 AMQP 入站通道适配器和一个 ThreadPoolTaskExecutor 配置如下:
<task:executor id="exec.newItems" pool-size="5" />
<int-amqp:inbound-channel-adapter
connection-factory="amqpConnectionFactory" auto-startup="true"
queue-names="#{newItemsQueueName}"
channel="newItems.payloadType.routingChannel"
message-converter="jsonMessageConverter"
acknowledge-mode="AUTO" error-channel="errorChannel"
concurrent-consumers="5"
mapped-request-headers="*"
channel-transacted="false"
task-executor="exec.newItems"
/>
我想知道在 AMQP 入站通道适配器中为并发消费者设置的值与任务执行器配置中的池大小之间的关系。
这是我在 Eclipse 中使用 JVM Monitor 插件观察到的结果。
如果concurrent-consumers大于pool-size并且pool-size为x, 然后创建了 x 个线程,但它们处于阻塞状态,并且 消息未被处理。
如果concurrent-consumers等于pool-size并且pool-size是x, 然后创建 x 个线程并处理消息。
如果并发消费者小于池大小并且 concurrent-consumers 为 y,则创建 y 个线程并发送消息 已处理。
我认为并发消费者可能正在执行程序上设置最大池大小。这是一个准确的观察吗?
我会说你的调查是正确的。
消费者在他们的 Runnable.run
实现中是 while(true)
的长期任务,因此他们每个人都永远从那个执行者那里获得一个线程(当然直到它死亡)。
默认Executor
是SimpleMessageListenerContainer
中的SimpleAsyncTaskExecutor
。这意味着任何新的消费者都有自己的线程并且不会导致阻塞问题。
在 ThreadPoolTaskExecutor
中,当池中没有足够的线程供我们使用时,我们确实会遇到一个问题。我们的一些消费者不会做他们的工作。当我们在不同的组件之间共享 taskExecutor
时,它可能越糟糕。
在应用程序中使用managed
执行器是个不错的主意(尤其是在AS环境中),即使是对于这种长期存在的任务,但我们确实应该确保我们拥有最佳配置用于并发。