某些尝试在同一台机器上连接到 RabbitMQ 会导致 TimeoutException

Some attempts to connect to RabbitMQ in the same machine cause TimeoutException

我在 CentOS Linux 上有两个 Java 应用程序 运行,它们将消息重复发布到与它们安装在同一台机器上的 RabbitMQ 实例中的队列。

对于连接到队列的每条新消息,发布它,然后断开连接。

有时尝试建立连接会抛出 TimeoutException:

br.com.projectname.ProcessStoppingException: java.util.concurrent.TimeoutException
    at br.com.projectname.EscritorDaFilaDoProcessadorDePacotes.escrever(EscritorDaFilaDoProcessadorDePacotes.java:55)
    (...)
    at br.com.projectname.ViaDeComunicacao.run(ViaDeComunicacao.java:39)
Caused by: java.util.concurrent.TimeoutException
    at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
    at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:367)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:293)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:678)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:722)

可以通过执行批量发布来重现该问题,但有时在应用程序空闲时也会发生。

根据我的阅读,增加 5-10 秒的默认连接超时不是一个好主意,因为这样的异常表明我应该调查某处的配置或网络问题。

然而,RabbitMQ networking guide 并没有提供太多见解。按照最后一部分的说明,我使用 127.0.0.1 作为主机,所以我想 DNS 查找和反向查找不会发生。我还将 Erlang VM I/O 线程池(我认为可能与该问题相关的唯一配置)从默认值 30 增加到 60(即可用内核数的 15 倍这是 4) 无济于事。

有第三个应用程序通过 Channel#basicGet() 不断轮询它,以一种次优的方式使用队列,但我认为它不会导致问题,因为即使该应用程序它也会继续发生不是 运行.

有什么想法吗?超时似乎在不到 5 秒内发生,所以也许我应该尝试增加它。

编辑

代码非常简单:

public void escrever(JSONObject json) throws ProcessStoppingException {
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(porta);
        factory.setUsername(usuario);
        factory.setPassword(senha);
        factory.setVirtualHost(virtualHost);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(fila, true, false, false, null);
        channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_BASIC, json.toString().getBytes(StandardCharsets.UTF_8));
        channel.close();
        connection.close();
    } catch (IOException | TimeoutException e) {
        throw new ProcessStoppingException(e);
    }

通过保持连接持续打开并且只为每条消息执行创建通道、发布和关闭通道来解决它。

它并没有消除我对某些未处理的配置问题的怀疑,但至少它有效。