关闭可读队列中的连接 python
Close connection in readable queue python
当微服务从RabbitMQ取消息,处理数据时间长,关闭与窃听队列的连接时
Traceback (most recent call last):
File "/home/saturn/Logic/MAIN_1.py", line 200, in <module>
channel.start_consuming()
File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 1780, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 707, in process_data_events
self._flush_output(common_terminator)
File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 474, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")
现在任务处理时间将近 5 分钟。主要代码如 -
credentials = pika.PlainCredentials(username='NAME',password='PASSWORD')
ConnParr = pika.ConnectionParameters(host='HOST', credentials=credentials)
connection = pika.BlockingConnection(ConnParr)
channel = connection.channel()
def callback(ch, method, properties, body):
in_data = json.loads(body.decode('utf-8'))
main(in_data)
ch.basic_ack(delivery_tag=method.delivery_tag)
def main(in_data):
time.sleep(300)
channel.queue_declare(queue=IN_QUEUE)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=IN_QUEUE)
channel.start_consuming()
发生这种情况是因为 time.sleep
调用阻塞了您的主线程并阻止 Pika 从 RabbitMQ 发送和接收心跳消息。您有几种方法可以解决此问题:
升级到 Pika 0.12.0
,运行 你的 main
方法在一个单独的线程中,并在那个线程中使用 add_callback_threadsafe
来调用 basic_ack
在频道 docs.
使用异步使用者示例作为代码的起点。
要记住的重要部分是您不能阻止 Pika 的内部事件循环并期望连接保持活动状态。如果您需要进一步的帮助,Pika 的维护者(我和其他人)会监控 rabbitmq-users
and pika-python
邮件列表中的问题。
当微服务从RabbitMQ取消息,处理数据时间长,关闭与窃听队列的连接时
Traceback (most recent call last):
File "/home/saturn/Logic/MAIN_1.py", line 200, in <module>
channel.start_consuming()
File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 1780, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 707, in process_data_events
self._flush_output(common_terminator)
File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 474, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")
现在任务处理时间将近 5 分钟。主要代码如 -
credentials = pika.PlainCredentials(username='NAME',password='PASSWORD')
ConnParr = pika.ConnectionParameters(host='HOST', credentials=credentials)
connection = pika.BlockingConnection(ConnParr)
channel = connection.channel()
def callback(ch, method, properties, body):
in_data = json.loads(body.decode('utf-8'))
main(in_data)
ch.basic_ack(delivery_tag=method.delivery_tag)
def main(in_data):
time.sleep(300)
channel.queue_declare(queue=IN_QUEUE)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=IN_QUEUE)
channel.start_consuming()
发生这种情况是因为 time.sleep
调用阻塞了您的主线程并阻止 Pika 从 RabbitMQ 发送和接收心跳消息。您有几种方法可以解决此问题:
升级到 Pika
0.12.0
,运行 你的main
方法在一个单独的线程中,并在那个线程中使用add_callback_threadsafe
来调用basic_ack
在频道 docs.使用异步使用者示例作为代码的起点。
要记住的重要部分是您不能阻止 Pika 的内部事件循环并期望连接保持活动状态。如果您需要进一步的帮助,Pika 的维护者(我和其他人)会监控 rabbitmq-users
and pika-python
邮件列表中的问题。