SpringJMS - 如何断开 MessageListenerContainer
SpringJMS - How to Disconnect a MessageListenerContainer
我想断开队列的 DefaultMessageListenerContainer。我正在使用 dmlc.stop()、dmlc.shutdown()。在连接时,有 5 个消费者线程连接到队列。当我尝试断开连接时,4 个消费者断开连接,但 1 个消费者保持连接。 (请参阅线程末尾的屏幕截图)。
环境
1. ActiveMQ 与 AMQP
2. SpringJMS 与 ApacheQpid
问题
调用 destroy 和 stop 方法后,仍然有一个消费者连接到队列。
所需的解决方案
我想知道如何完全断开与队列的消费者连接为零的 MessageListenerContainer。
配置和代码
@Bean
public DefaultMessageListenerContainer getMessageContainer(ConnectionFactory amqpConnectionFactory, QpidConsumer messageConsumer){
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
listenerContainer.setConcurrency("5-20");
listenerContainer.setRecoveryInterval(jmsRecInterval);
listenerContainer.setConnectionFactory(new CachingConnectionFactory(amqpConnectionFactory));
listenerContainer.setMessageListener(messageConsumer);
listenerContainer.setDestinationName(destinationName);
return listenerContainer;
}
private void stopListenerIfRunning() {
DefaultMessageListenerContainer dmlc = (DefaultMessageListenerContainer) ctx.getBean("messageContainer");
if (null != dmlc) {
if(!dmlc.isRunning()){return;}
dmlc.stop(new Runnable() {
@Override
public void run() {
logger.debug("Closed Listener Container for Connection {}", sub.getQueueName());
if (sub.getSubscriptionStatus() == SubscriptionStatus.DELETED
|| sub.getSubscriptionStatus() == SubscriptionStatus.SUSPENDED_DELETE) {
listenerHandles.remove(sub.getQueueName());
}
}
});
dmlc.destroy();
dmlc.shutdown();
}
}
}
listenerContainer.setConnectionFactory(newCachingConnectionFactory(amqpConnectionFactory));
你需要摧毁CachingConnectionFactory
。
您通常不需要带有侦听器容器的缓存工厂,因为会话是长期存在的;如果你有可变的并发,你绝对不应该;来自 javadocs...
* <p><b>Note: Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory}
* in combination with dynamic scaling.</b> Ideally, don't use it with a message
* listener container at all, since it is generally preferable to let the
* listener container itself handle appropriate caching within its lifecycle.
* Also, stopping and restarting a listener container will only work with an
* independent, locally cached Connection - not with an externally cached one.
如果要缓存连接,请使用 SingleConnectionFactory
或在 CCF 上调用 setCacheConsumers(false)
。
我想断开队列的 DefaultMessageListenerContainer。我正在使用 dmlc.stop()、dmlc.shutdown()。在连接时,有 5 个消费者线程连接到队列。当我尝试断开连接时,4 个消费者断开连接,但 1 个消费者保持连接。 (请参阅线程末尾的屏幕截图)。
环境 1. ActiveMQ 与 AMQP 2. SpringJMS 与 ApacheQpid
问题 调用 destroy 和 stop 方法后,仍然有一个消费者连接到队列。
所需的解决方案
我想知道如何完全断开与队列的消费者连接为零的 MessageListenerContainer。
配置和代码
@Bean
public DefaultMessageListenerContainer getMessageContainer(ConnectionFactory amqpConnectionFactory, QpidConsumer messageConsumer){
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
listenerContainer.setConcurrency("5-20");
listenerContainer.setRecoveryInterval(jmsRecInterval);
listenerContainer.setConnectionFactory(new CachingConnectionFactory(amqpConnectionFactory));
listenerContainer.setMessageListener(messageConsumer);
listenerContainer.setDestinationName(destinationName);
return listenerContainer;
}
private void stopListenerIfRunning() {
DefaultMessageListenerContainer dmlc = (DefaultMessageListenerContainer) ctx.getBean("messageContainer");
if (null != dmlc) {
if(!dmlc.isRunning()){return;}
dmlc.stop(new Runnable() {
@Override
public void run() {
logger.debug("Closed Listener Container for Connection {}", sub.getQueueName());
if (sub.getSubscriptionStatus() == SubscriptionStatus.DELETED
|| sub.getSubscriptionStatus() == SubscriptionStatus.SUSPENDED_DELETE) {
listenerHandles.remove(sub.getQueueName());
}
}
});
dmlc.destroy();
dmlc.shutdown();
}
}
}
listenerContainer.setConnectionFactory(newCachingConnectionFactory(amqpConnectionFactory));
你需要摧毁CachingConnectionFactory
。
您通常不需要带有侦听器容器的缓存工厂,因为会话是长期存在的;如果你有可变的并发,你绝对不应该;来自 javadocs...
* <p><b>Note: Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory}
* in combination with dynamic scaling.</b> Ideally, don't use it with a message
* listener container at all, since it is generally preferable to let the
* listener container itself handle appropriate caching within its lifecycle.
* Also, stopping and restarting a listener container will only work with an
* independent, locally cached Connection - not with an externally cached one.
如果要缓存连接,请使用 SingleConnectionFactory
或在 CCF 上调用 setCacheConsumers(false)
。