永久 JMS 连接失败但没有错误或 InactivityExceptions

Permanent JMS Connection failure but no error or InactivityExceptions

我们遇到了与此类似的问题 old question。然而,我们的设置有点不同。例如,心跳应该已经存在,因为我们有来自 ActiveMQ 的默认 InactivityMonitor。

我们有一个使用嵌入式代理的客户端。嵌入式代理有一个连接到远程代理 运行 的网络连接器,作为机器上的独立服务。通过这种方式,我们可以解耦客户端和服务器之间的通信。嵌入式代理充当客户端的本地队列。

客户端向嵌入式代理发送消息。这些消息要么通过网络连接器流向远程代理,要么(当连接暂时不可用时)留在嵌入式代理中,直到重新建立连接。

嵌入式代理和远程代理都是 Apache ActiveMQ 的实例。 JMS 实现基于 Spring JMS。

在实践中,我们有时会看到奇怪的行为(通常在很长一段时间后没有任何问题):

在远程代理上启用了不活动监视器。嵌入式代理是使用下面显示的代码创建的(为简洁起见,省略了 SSL 代码)。

我们不知道是什么原因导致了这个问题,更重要的是,不知道为什么安装程序没有自动恢复。我们希望此设置能够自动检测连接不可用并设置新连接。然而,这似乎并没有发生。

有没有人知道我们可以从哪里开始寻找,或者更好的是,知道问题可能出在哪里?

@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory(final Optional<TransportListener> transportListener)
{
    final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();

    activeMQConnectionFactory.setClientIDPrefix(clientIDPrefix);
  activeMQConnectionFactory.setBrokerURL(this.env.getProperty("activemq.connection.url"));
    activeMQConnectionFactory.setUserName(this.env.getProperty("activemq.users.username"));
    activeMQConnectionFactory.setPassword(this.env.getProperty("activemq.users.password"));

    transportListener.ifPresent(activeMQConnectionFactory::setTransportListener);

    return activeMQConnectionFactory;
}

@Bean
public CachingConnectionFactory cachingConnectionFactory(final ActiveMQConnectionFactory activeMQConnectionFactory)
{
    return new CachingConnectionFactory(activeMQConnectionFactory);
}

@Bean
public JmsTemplate jmsTemplate(final CachingConnectionFactory cachingConnectionFactory)
{
    final JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
    return jmsTemplate;
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(final ActiveMQConnectionFactory activeMQConnectionFactory)
{
    final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(activeMQConnectionFactory);
    factory.setConcurrency("3-10");

    return factory;
}

@Bean
public MessageSender messageSender()
{
    return new MessageSender();
}

@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService bufferingBroker() throws Exception
{
    final BrokerService broker = new BrokerService();

    String brokerName = UUID.randomUUID().toString();
    broker.setBrokerName(brokerName);

    final NetworkConnector networkConnector = new DiscoveryNetworkConnector(
        new URI("static://" + this.env.getProperty("activemq.connection.url")));
    networkConnector.setUserName(this.env.getProperty("activemq.users.username"));
    networkConnector.setPassword(this.env.getProperty("activemq.users.password"));
    networkConnector.setNetworkTTL(5);
    broker.addNetworkConnector(networkConnector);

    return broker;
}

@Bean
public ActiveMQConnectionFactory embeddedActiveMQConnectionFactory(final BrokerService bufferingBroker) throws Exception
{
    final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL("vm://" + bufferingBroker.getBrokerName() + "?create=false");

    return activeMQConnectionFactory;
}

@Bean
public CachingConnectionFactory embeddedCachingConnectionFactory(final ActiveMQConnectionFactory embeddedActiveMQConnectionFactory) throws Exception
{
    return new CachingConnectionFactory(embeddedActiveMQConnectionFactory);
}

@Bean
public JmsTemplate embeddedJmsTemplate(final CachingConnectionFactory embeddedCachingConnectionFactory) throws Exception
{
    return new JmsTemplate(embeddedCachingConnectionFactory);
}

@Bean
public DefaultJmsListenerContainerFactory embeddedJmsListenerContainerFactory(final ActiveMQConnectionFactory embeddedActiveMQConnectionFactory) throws Exception
{
    final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(embeddedActiveMQConnectionFactory);
    factory.setConcurrency("3-10");

    return factory;
}

@Bean
public BufferedMessageSender bufferedMessageSender()
{
    return new BufferedMessageSender();
}

原来是外部防火墙阻止了字节消息,而文本消息可以毫无问题地通过。