Rabbitmq:重启集群中的节点后,SAC 队列的消费者未连接

Rabbitmq: Consumers for SAC queues are not connected after node in a cluster is restarted

设置: rabbitmq 节点的 3 节点集群(通过 docker),在 ha-proxy 之后。

版本:

Spring-boot(1.5.4) 应用有 3 个队列。

  1. 定义为“独占”,持久化,自动删除为false
  2. 定义为“SAC”,持久化,自动删除为false
  3. 经典、持久、自动删除为假

政策:

场景:

  1. 当应用程序第一次启动时,队列被正确注册。
  2. 我随机关闭任何节点,如果它是主节点,则触发镜像并且其中一个镜像节点变得 master.All 到目前为止很好。
  3. 当我启动该节点时,我开始在应用程序日志中收到异常:

日志:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - failed to perform operation on queue 'single-active-consumer-message-queue' in vhost '/dev' due to timeout, class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.2.jar!/:4.0.2]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.2.jar!/:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.2.jar!/:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.2.jar!/:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.2.jar!/:4.0.2]
    ... 25 common frames omitted

它尝试重新连接 3 次,然后最终打印下面的日志

2021-03-18 17:08:55.487 ERROR 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2021-03-18 17:08:55.487  INFO 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2021-03-18 17:08:55.487  INFO 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

在 RabbitMQ 控制台上我看到了这个:

CachingConnectionFactory 是用 ha-proxy 的基本连接细节定义的

@Bean
    protected SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, RetryOperationsInterceptor retryAdvice) {
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        containerFactory.setDefaultRequeueRejected(false);
        containerFactory.setAdviceChain(retryAdvice);
        containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
        return containerFactory;
    }

Boot 1.5.x 和 Spring AMQP 1.7.x 已停产,不再受支持。

也就是说,以下内容也适用于 1。7.x。

如果队列恢复时间超过 15 秒(默认),就会出现这种情况。

这由 2 个容器属性控制。

/**
 * Set the number of retries after passive queue declaration fails.
 * @param declarationRetries The number of retries, default 3.
 * @since 1.3.9
 * @see #setFailedDeclarationRetryInterval(long)
 */
public void setDeclarationRetries(int declarationRetries) {
    this.declarationRetries = declarationRetries;
}

/**
 * Set the interval between passive queue declaration attempts in milliseconds.
 * @param failedDeclarationRetryInterval the interval, default 5000.
 * @since 1.3.9
 */
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) {
    this.failedDeclarationRetryInterval = failedDeclarationRetryInterval;
}

您可以增加其中一项或两项,以防止容器在这种情况下停止。

关于在这种情况下单个活动消费者队列的预期行为,我建议您在 rabbitmq-users Google 组上询问。

此行为是由 RabbitMQ 中的错误引起的。好像在3.8.17版本修复了。

https://github.com/rabbitmq/rabbitmq-server/issues/3072