RabbitMQ 破损管道错误或丢失消息

RabbitMQ broken pipe error or lost messages

使用pika库的BlockingConnection连接RabbitMQ,发布消息偶尔报错:

Fatal Socket Error: error(32, 'Broken pipe')

这是一个非常简单的子进程,它从内存中的队列中取出一些信息并向 AMQP 发送一个小的 JSON 消息。该错误似乎只有在系统有几分钟未发送任何消息时才会出现。

设置:

connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
    exchange='xyz',
    exchange_type='fanout',
    passive=False,
    durable=True,
    auto_delete=False
)

排队代码捕获任何连接错误并重试:

def _enqueue(self, message_id, data):
    try:
        published = self.channel.basic_publish(
            self.amqp_exchange,
            self.amqp_routing_key,
            json.dumps(data),
            pika.BasicProperties(
                content_type="application/json",
                delivery_mode=2,
                message_id=message_id
            )
        )

        # Confirm delivery or retry
        if published:
            self.retry_count = 0
        else:
            raise EnqueueException("Message publish not confirmed.")

    except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
            pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
            pika.exceptions.UnroutableError, socket.timeout) as e:
        self.retry_count += 1
        if self.retry_count < 5:
            logging.warning("Reconnecting and resending")
            if self.connection.is_open:
                self.connection.close()
            self.connect()
            self._enqueue(message_id, data)
        else:
            raise e

这有时会在第二次尝试时起作用。它通常会挂起一段时间,或者在最终抛出异常之前丢弃消息 (possibly related bug report). Since it only happens when the system is quiet for a few minutes I'm guessing it's due to a connection timeout. But AMQP has a heartbeat system and pika reportedly uses it (related bug report)。

为什么我会收到此错误消息或丢失消息,为什么连接在不使用时不会保持打开状态?

Broken Pipe 错误表示当客户端关闭连接时,服务器正在尝试向套接字中写入内容。

如我所见,您有一些可能已关闭的共享 "self.connection" before/in 并行线程?

您也可以将日志记录级别设置为 DEBUG 并查看客户端的日志以确定客户端关闭连接的时间。

来自另一个bug report:

As BlockingConnection doesn't handle heartbeats in the background and the heartbeat_interval can't override the servers suggested heartbeat interval (that's a bug too), i suggest that heartbeats should be disabled by default (rely on TCP keep-alive instead).

If processing a task in a consume block takes longer time then the server suggested heartbeat interval, the connection will be closed by the server and the client won't be able to ack the message when it's done processing.

v1.0.0 中的 update 可能有助于解决此问题。

所以我实施了一个解决方法。每 30 秒我通过队列发布一条心跳消息。这使连接保持打开状态,并具有向客户确认我的应用程序已启动和 运行.

的额外好处