Rabbitmq:重启集群中的节点后,SAC 队列的消费者未连接
Rabbitmq: Consumers for SAC queues are not connected after node in a cluster is restarted
设置:
rabbitmq 节点的 3 节点集群(通过 docker),在 ha-proxy 之后。
版本:
- RabbitMQ:3.8.11
- 二郎:23.2.3
- Spring-amqp: 1.7.3
Spring-boot(1.5.4) 应用有 3 个队列。
- 定义为“独占”,持久化,自动删除为false
- 定义为“SAC”,持久化,自动删除为false
- 经典、持久、自动删除为假
政策:
场景:
- 当应用程序第一次启动时,队列被正确注册。
- 我随机关闭任何节点,如果它是主节点,则触发镜像并且其中一个镜像节点变得 master.All 到目前为止很好。
- 当我启动该节点时,我开始在应用程序日志中收到异常:
日志:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - failed to perform operation on queue 'single-active-consumer-message-queue' in vhost '/dev' due to timeout, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.2.jar!/:4.0.2]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.2.jar!/:4.0.2]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.2.jar!/:4.0.2]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.2.jar!/:4.0.2]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.2.jar!/:4.0.2]
... 25 common frames omitted
它尝试重新连接 3 次,然后最终打印下面的日志
2021-03-18 17:08:55.487 ERROR 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2021-03-18 17:08:55.487 INFO 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2021-03-18 17:08:55.487 INFO 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
在 RabbitMQ 控制台上我看到了这个:
CachingConnectionFactory 是用 ha-proxy 的基本连接细节定义的
@Bean
protected SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, RetryOperationsInterceptor retryAdvice) {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setDefaultRequeueRejected(false);
containerFactory.setAdviceChain(retryAdvice);
containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
return containerFactory;
}
Boot 1.5.x 和 Spring AMQP 1.7.x 已停产,不再受支持。
也就是说,以下内容也适用于 1。7.x。
如果队列恢复时间超过 15 秒(默认),就会出现这种情况。
这由 2 个容器属性控制。
/**
* Set the number of retries after passive queue declaration fails.
* @param declarationRetries The number of retries, default 3.
* @since 1.3.9
* @see #setFailedDeclarationRetryInterval(long)
*/
public void setDeclarationRetries(int declarationRetries) {
this.declarationRetries = declarationRetries;
}
/**
* Set the interval between passive queue declaration attempts in milliseconds.
* @param failedDeclarationRetryInterval the interval, default 5000.
* @since 1.3.9
*/
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) {
this.failedDeclarationRetryInterval = failedDeclarationRetryInterval;
}
您可以增加其中一项或两项,以防止容器在这种情况下停止。
关于在这种情况下单个活动消费者队列的预期行为,我建议您在 rabbitmq-users Google 组上询问。
此行为是由 RabbitMQ 中的错误引起的。好像在3.8.17版本修复了。
设置: rabbitmq 节点的 3 节点集群(通过 docker),在 ha-proxy 之后。
版本:
- RabbitMQ:3.8.11
- 二郎:23.2.3
- Spring-amqp: 1.7.3
Spring-boot(1.5.4) 应用有 3 个队列。
- 定义为“独占”,持久化,自动删除为false
- 定义为“SAC”,持久化,自动删除为false
- 经典、持久、自动删除为假
政策:
场景:
- 当应用程序第一次启动时,队列被正确注册。
- 我随机关闭任何节点,如果它是主节点,则触发镜像并且其中一个镜像节点变得 master.All 到目前为止很好。
- 当我启动该节点时,我开始在应用程序日志中收到异常:
日志:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - failed to perform operation on queue 'single-active-consumer-message-queue' in vhost '/dev' due to timeout, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.2.jar!/:4.0.2]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.2.jar!/:4.0.2]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.2.jar!/:4.0.2]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.2.jar!/:4.0.2]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.2.jar!/:4.0.2]
... 25 common frames omitted
它尝试重新连接 3 次,然后最终打印下面的日志
2021-03-18 17:08:55.487 ERROR 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2021-03-18 17:08:55.487 INFO 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2021-03-18 17:08:55.487 INFO 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
在 RabbitMQ 控制台上我看到了这个:
CachingConnectionFactory 是用 ha-proxy 的基本连接细节定义的
@Bean
protected SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, RetryOperationsInterceptor retryAdvice) {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setDefaultRequeueRejected(false);
containerFactory.setAdviceChain(retryAdvice);
containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
return containerFactory;
}
Boot 1.5.x 和 Spring AMQP 1.7.x 已停产,不再受支持。
也就是说,以下内容也适用于 1。7.x。
如果队列恢复时间超过 15 秒(默认),就会出现这种情况。
这由 2 个容器属性控制。
/**
* Set the number of retries after passive queue declaration fails.
* @param declarationRetries The number of retries, default 3.
* @since 1.3.9
* @see #setFailedDeclarationRetryInterval(long)
*/
public void setDeclarationRetries(int declarationRetries) {
this.declarationRetries = declarationRetries;
}
/**
* Set the interval between passive queue declaration attempts in milliseconds.
* @param failedDeclarationRetryInterval the interval, default 5000.
* @since 1.3.9
*/
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) {
this.failedDeclarationRetryInterval = failedDeclarationRetryInterval;
}
您可以增加其中一项或两项,以防止容器在这种情况下停止。
关于在这种情况下单个活动消费者队列的预期行为,我建议您在 rabbitmq-users Google 组上询问。
此行为是由 RabbitMQ 中的错误引起的。好像在3.8.17版本修复了。