Spring 对 rabbitmq 的 amqp RPC 请求超时

Spring amqp RPC request to rabbitmq getting timeouts

我正在使用来自多个服务器的 AmqpTemplate.sendAndReceive func,它工作正常并且在 rabbitmq 重新启动或网络问题后重新连接工作。

它在几个星期内都运行良好,但突然间我的一台服务器出现问题 每个 sendAndReceive 调用超时,rabbitmq 正在获取消息并且它 正在处理,但 sendAndReceive 没有得到响应(回复超时设置为 60 秒,处理消息只需要几秒)。 其他服务器同时使用相同的代码和相同的队列工作。

服务器在我重新启动服务后才恢复工作。

我认为这是一些重新连接问题(即使消息已成功发送到 rabbitmq),也许 AmqpTemplate 响应侦听器没有重新连接或其他问题。

任何人都知道可能是什么问题?以及我该如何预防 它会再次发生吗?

我的 ConnectionFactory 设置:

setConnectionTimeout(1000);
setRequestedHeartbeat(100);
setTopologyRecoveryEnabled(true);
setAutomaticRecoveryEnabled(true);

编辑:

Spring-AMQP版本:1.4.3

<bean id="myConnectionFactory" class="path.to.myConnectionFactoryClass"></bean>

<rabbit:connection-factory id="myRabbitConnectionFactory" connection-factory="myConnectionFactory" channel-cache-size="25" />

<rabbit:template id="myTemplate" connection-factory="myRabbitConnectionFactory" reply-timeout="65000" />

编辑:

又发生了,我看到它在收到错误后停止工作:

org.springframework.amqp.AmqpConnectException: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:51)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:110)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1051)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)
at MyClass.onMessage(MyClass.java:1234)
at sun.reflect.GeneratedMethodAccessor3558.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy95.onMessage(Unknown Source)
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:237)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=13=]1(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doInTransaction(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doInTransaction(SimpleMessageListenerContainer.java:968)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:968)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:174)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:496)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:96)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:42)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:747)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access0(CachingConnectionFactory.java:736)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:416)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:392)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access0(CachingConnectionFactory.java:75)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:623)
at com.sun.proxy.$Proxy74.basicCancel(Unknown Source)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doInRabbit(RabbitTemplate.java:944)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doInRabbit(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1045)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)

不确定连接关闭的原因以及未重新连接的原因。

at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)

at MyClass.onMessage(MyClass.java:1234) ...

所以这不是服务器端的恢复;您的 MyClass 在收到消息后正在调用另一个 rabbit 模板发送和接收操作。所以它在这种情况下充当客户端。

但是,正常情况下,这个异常会抛给容器,rabbitmq会重新提交消息(除非你有确认模式NONE),只要你的监听器抛出异常给容器。如果您的侦听器捕获到异常并且除了记录之外什么都不做,您将看到此结果。

如果不想让rabbit重新提交失败的消息,或者入站容器不确认消息,可以在outbound rabbit模板中配置一个RetryTemplate来恢复连接;参见 the documentation

如果这不能解释您的情况,您需要显示完整的配置、MyClass.onMessage() 中的代码和完整的日志。

我认为它正在解决。 我没有在 OnMesaage 中发现异常,我相信它杀死了我的听众。