如何在 Python 中使用 RabbitMQ pika.basic_consume 更改超时
How to change timeout using RabbitMQ pika.basic_consume in Python
使用 RabbitMQ Python 客户端 运行 subscriber.py
:
import pika, time
credentials = pika.PlainCredentials('user', 'pass')
parameters = pika.ConnectionParameters(host='localhost', port=6672, credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='my_queue')
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
time.sleep(600)
print ('process completed')
channel.basic_consume(queue='my_queue', on_message_callback=callback)
channel.start_consuming()
callback
函数完成后连接中断。
它似乎总是发生在第 60 秒。 channel.basic_consume()
方法似乎不想等待主线程完成回调函数。有没有办法确保连接在 60 秒后不会断开?
您的 time.sleep
调用阻塞了 Pika 的 I/O 循环,从而阻止处理心跳。 不要阻止 I/O 循环!!!
相反,您应该在一个单独的线程中完成长时间的工作,运行 并从该线程正确确认消息。还好我有an example right here: link
注意: RabbitMQ 团队监控 rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。
我认为“heartbeat”参数可以解决这个问题。只需以秒为单位设置时间:
import pika, time
credentials = pika.PlainCredentials('user', 'pass')
parameters = pika.ConnectionParameters(host='localhost', port=6672, credentials=credentials, heartbeat=36000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='my_queue')
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
time.sleep(600)
print ('process completed')
channel.basic_consume(queue='my_queue', on_message_callback=callback)
channel.start_consuming()
使用 RabbitMQ Python 客户端 运行 subscriber.py
:
import pika, time
credentials = pika.PlainCredentials('user', 'pass')
parameters = pika.ConnectionParameters(host='localhost', port=6672, credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='my_queue')
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
time.sleep(600)
print ('process completed')
channel.basic_consume(queue='my_queue', on_message_callback=callback)
channel.start_consuming()
callback
函数完成后连接中断。
它似乎总是发生在第 60 秒。 channel.basic_consume()
方法似乎不想等待主线程完成回调函数。有没有办法确保连接在 60 秒后不会断开?
您的 time.sleep
调用阻塞了 Pika 的 I/O 循环,从而阻止处理心跳。 不要阻止 I/O 循环!!!
相反,您应该在一个单独的线程中完成长时间的工作,运行 并从该线程正确确认消息。还好我有an example right here: link
注意: RabbitMQ 团队监控 rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。
我认为“heartbeat”参数可以解决这个问题。只需以秒为单位设置时间:
import pika, time
credentials = pika.PlainCredentials('user', 'pass')
parameters = pika.ConnectionParameters(host='localhost', port=6672, credentials=credentials, heartbeat=36000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='my_queue')
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
time.sleep(600)
print ('process completed')
channel.basic_consume(queue='my_queue', on_message_callback=callback)
channel.start_consuming()