如何修复 ActiveMQ 持久订阅抛出 "durable consumer already in use" 错误

How to fix ActiveMQ durable subscription throwing "durable consumer already in use" error

我正在尝试编写一个基本的 ActiveMQ 客户端来收听一个主题。我正在使用 Spring Boot ActiveMQ。我有一个基于使用 DefaultJmsListenerContainerFactory 的各种教程构建的实现,但我在使其正常工作时遇到了一些问题。

@Configuration
@EnableJms
public class JmsConfig {
    @Bean
    public DefaultJmsListenerContainerFactory jmsContainerFactory(ConnectionFactory connectionFactory,
                                                                  DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConcurrency("3-10");
        factory.setConnectionFactory(connectionFactory);

        configurer.configure(factory, connectionFactory);

       factory.setSubscriptionDurable(true);
       factory.setClientId("someUniqueClientId");

       return factory;
   }
}

@JmsListener(destination="someTopic", containerFactory="jmsContainerFactory", subscription="someUniqueSubscription")
public void onMessage(String msg) {
    ...
}

一切正常,直到我尝试进行持久订阅。当我这样做时,我发现在容器工厂上设置了客户端 ID 时,我收到一条关于无法在共享连接上设置客户端 ID 的错误。

Cause: setClientID call not supported on proxy for shared Connection. Set the 'clientId' property on the SingleConnectionFactory instead.

当我更改代码以在连接工厂上设置客户端 ID(它是一个包装 ActiveMQConnectionFactory 的 CachingConnectionFactory)时,服务成功启动,读取几条消息,然后开始持续输出此错误:

Setup of JMS message listener invoker failed for destination 'someTopic' - trying to recover. Cause: Durable consumer is in use for client: someUniqueClientId and subscriptionName: someUniqueSubscription

我继续收到消息,但这个错误也混杂在日志中。这似乎是一个问题,但我真的不清楚如何解决它。

我确实有一个天真的实现,没有任何 spring 代码,直接使用 ActiveMQConnectionFactory 并且似乎很乐意使用持久消费者(但它有自己的不同问题)。无论如何,我不认为这是缺乏对另一端持久连接的支持。

我希望在这方面有更多经验的人可以帮助我弄清楚这个错误是否是我可以忽略的,或者我需要做些什么来解决它。

谢谢!

JMS 1.1(您使用的是 ActiveMQ 5.x)不支持共享持久订阅。因此,当您使用 setConcurrency("3-10") 和 Spring 尝试创建 > 1 个订阅时,您会收到错误消息。我看到解决这个问题的两种主要方法:

  1. 使用 setConcurrency("1") 会将 subscribers/consumers 的数量限制为 1。根据您的要求,这可能会对性能产生严重的负面影响。
  2. 切换到 ActiveMQ Artemis which does support JMS 2.0 and invoke setSubscriptionShared(true)