Rabbitmq 连接在本地主机上突然关闭
Rabbitmq connection close abruptly on localhost
我们在本地主机 dev/testing 场景中突然关闭了与 rabbitmq 的连接问题。在我们的开发环境中,我们在每个开发人员 Windows 7 机器上安装了 rabbitmq,我们使用 java 客户端通过 Spring AMQP 库连接到它。一段时间内一切正常,但在某个时间点连接断开,并在 rabbitmq 日志中显示以下消息:
=WARNING REPORT==== 4-Jan-2016::14:39:37 ===
closing AMQP connection <0.3731.0> (127.0.0.1:50792 -> 127.0.0.1:5672):
connection_closed_abruptly
在客户端日志中我们有这个异常:
04/01/2016 14:39:37.181 (AMQP Connection 127.0.0.1:5672) ERROR [CachingConnectionFactory] Channel shutdown: connection error
04/01/2016 14:39:38.188 (SimpleAsyncTaskExecutor-2) WARN [SimpleMessageListenerContainer] Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:723)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:713)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:571)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Unrecognized Windows Sockets error: 0: recv failed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536)
... 1 more
在客户端启动时,我们创建了一个动态队列并将多个消费者附加到其中。此连接删除导致队列被删除,并且任何后续重新创建它的尝试都会失败,并出现以下异常:
04/01/2016 14:39:38.239 (SimpleAsyncTaskExecutor-8) WARN [BlockingQueueConsumer] Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[MY_TEST_QU]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1165)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:704)
at com.sun.proxy.$Proxy77.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550)
... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
... 1 more
我们在 rabbitmq 3.4.x 和 3.5.x 和 Spring AMQP 1.3.x 和 1.5.x 中遇到了这个问题。有趣的是,这在我们的 QA 和生产环境中从未发生过,因为 rabbitmq 安装在单独的服务器上。
非常感谢任何帮助。
当没有队列可听时,org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations()
失败。
这不是队列 recreation
,它只是 PassiveDeclarations
。因此,如果您手动删除 "dynamic queue",您也应该手动重新创建它。
我的意思是,如果您不遵循 RabbitAdmin
bean 契约,您将别无选择,除非 re-create 手动执行。在这种情况下,您无需担心侦听器:当队列返回时,它会正确重新连接。
更新
从版本 1.5
Spring 开始,AMQP 提供 ListenerContainerConsumerFailedEvent
,它在所有 attemptPassiveDeclarations()
次尝试后准确地发出。因此,您可以捕获 ApplicationEvent
、stop()
您的侦听器容器,声明 queue
返回并再次 start()
容器。
如果您的队列不是 bean,则不会自动重新声明。
我们在本地主机 dev/testing 场景中突然关闭了与 rabbitmq 的连接问题。在我们的开发环境中,我们在每个开发人员 Windows 7 机器上安装了 rabbitmq,我们使用 java 客户端通过 Spring AMQP 库连接到它。一段时间内一切正常,但在某个时间点连接断开,并在 rabbitmq 日志中显示以下消息:
=WARNING REPORT==== 4-Jan-2016::14:39:37 ===
closing AMQP connection <0.3731.0> (127.0.0.1:50792 -> 127.0.0.1:5672):
connection_closed_abruptly
在客户端日志中我们有这个异常:
04/01/2016 14:39:37.181 (AMQP Connection 127.0.0.1:5672) ERROR [CachingConnectionFactory] Channel shutdown: connection error
04/01/2016 14:39:38.188 (SimpleAsyncTaskExecutor-2) WARN [SimpleMessageListenerContainer] Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:723)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:713)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:571)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Unrecognized Windows Sockets error: 0: recv failed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536)
... 1 more
在客户端启动时,我们创建了一个动态队列并将多个消费者附加到其中。此连接删除导致队列被删除,并且任何后续重新创建它的尝试都会失败,并出现以下异常:
04/01/2016 14:39:38.239 (SimpleAsyncTaskExecutor-8) WARN [BlockingQueueConsumer] Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[MY_TEST_QU]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1165)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:704)
at com.sun.proxy.$Proxy77.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550)
... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
... 1 more
我们在 rabbitmq 3.4.x 和 3.5.x 和 Spring AMQP 1.3.x 和 1.5.x 中遇到了这个问题。有趣的是,这在我们的 QA 和生产环境中从未发生过,因为 rabbitmq 安装在单独的服务器上。
非常感谢任何帮助。
当没有队列可听时,org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations()
失败。
这不是队列 recreation
,它只是 PassiveDeclarations
。因此,如果您手动删除 "dynamic queue",您也应该手动重新创建它。
我的意思是,如果您不遵循 RabbitAdmin
bean 契约,您将别无选择,除非 re-create 手动执行。在这种情况下,您无需担心侦听器:当队列返回时,它会正确重新连接。
更新
从版本 1.5
Spring 开始,AMQP 提供 ListenerContainerConsumerFailedEvent
,它在所有 attemptPassiveDeclarations()
次尝试后准确地发出。因此,您可以捕获 ApplicationEvent
、stop()
您的侦听器容器,声明 queue
返回并再次 start()
容器。
如果您的队列不是 bean,则不会自动重新声明。