RabbitMQ 在处理长 运行 任务和超时设置产生错误时关闭连接

RabbitMQ closes connection when processing long running tasks and timeout settings produce errors

我正在使用 RabbitMQ 生产者向消费者发送长 运行 任务(30 分钟以上)。问题是当与服务器的连接关闭并且未确认的任务重新排队时,消费者仍在处理任务。

通过研究我了解到可以使用 heartbeat or an increased connection timeout 来解决这个问题。这两种解决方案在尝试时都会引发错误。在阅读类似帖子的答案时,我还了解到自发布答案以来,RabbitMQ 已经实施了许多更改(例如,默认心跳超时已从 RabbitMQ 3.5.5 之前的 580 更改为 60)。

指定心跳和阻塞连接超时时:

credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()

显示以下错误:

TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'

在连接参数中指定 heartbeat_interval=1000 时,会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

socket_timeout = 1000 类似,显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

我是 运行 RabbitMQ 3.6.1,鼠兔 0.10.0 和 python 2.7 Ubuntu 14.04。

  1. 为什么上述方法会产生错误?
  2. 是否可以在有长时间 运行 连续任务的情况下使用心跳方法?例如,在执行需要 30 分钟以上的大型数据库连接时可以使用心跳吗?我赞成心跳方法,因为很多时候很难判断数据库连接等任务需要多长时间。

我已通读类似问题的答案

更新: 运行 code from the pika documentation 产生同样的错误。

我的系统 运行 遇到了您所看到的相同问题,即在执行非常长的任务时连接中断。

如果您的网络设置使得闲置 TCP/IP 连接被强制断开,心跳可能有助于保持连接有效。但是,如果不是这种情况,则更改心跳也无济于事。

更改连接超时根本无济于事。此设置仅在最初创建连接时使用。

I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.

这有两个原因,您已经 运行 了解这两个原因:

  1. 连接随机断开,即使在最好的情况下也是如此
  2. 由于消息重新排队而重新启动进程可能会导致问题

部署 RabbitMQ 代码的任务范围从不到一秒到几个小时不等,我发现立即确认消息并使用状态消息更新系统最适合非常长的任务,例如这样。

您将需要一个记录系统(可能带有数据库)来跟踪给定作业的状态。

当消费者拿起一条消息并启动流程时,它应该立即确认消息并向记录系统发送一条 "started" 状态消息。

流程完成后,发送另一条消息告知已完成。

这不会解决连接断开的问题,但无论如何都无法 100% 解决该问题。相反,它将防止在连接断开时发生消息重新排队问题。

不过,这个解决方案确实引入了另一个问题:当冗长的 运行ning 进程崩溃时,您如何恢复工作?

基本的答案是使用工作的记录系统(您的数据库)状态来告诉您需要重新开始该工作。当应用程序启动时,检查数据库以查看是否有未完成的工作。如果有,以任何适当的方式恢复或重新启动该工作。

我已经看到这个问题了。原因是你声明要使用这个队列。但是你没有在exchange中绑定queue

例如:

 @Bean(name = "test_queue")
 public Queue testQueue() {
        return queue("test_queue");
 }

@RabbitListener(queues = "test_queue_1")
public void listenCreateEvent(){
}

如果你监听队列没有绑定到交换器。它会发生。