有没有办法让 Pika BlockingConnection 一次消费一条消息?
Is there a way to let Pika BlockingConnection consume one message at a time?
import pika
params = pika.URLParameters([URL])
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='test', durable=True)
channel.basic_consume(do_things, queue='test')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
except:
rollbar.report_exc_info()
finally:
channel.close()
connection.close()
这是我用来消费消息的代码。问题是,假设我在 test
队列中有 100 条消息。一旦我启动消费者,它将获取所有 100 条消息并一条一条地处理它,即队列状态变为:消息就绪:0,未确认:100,总数:100。结果,我将无法旋转up new consumer并行处理100条消息,因为已经没有消息留给新consumer了(都被老consumer拿走了,虽然大部分消息没有处理)。有没有办法让消费者一次只能接受1条消息?
您需要指定您的频道所需的Quality of Service。
在您的情况下,prefetch_count
是您需要的参数。
import pika
params = pika.URLParameters([URL])
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
import pika
params = pika.URLParameters([URL])
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue='test', durable=True)
channel.basic_consume(do_things, queue='test')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
except:
rollbar.report_exc_info()
finally:
channel.close()
connection.close()
这是我用来消费消息的代码。问题是,假设我在 test
队列中有 100 条消息。一旦我启动消费者,它将获取所有 100 条消息并一条一条地处理它,即队列状态变为:消息就绪:0,未确认:100,总数:100。结果,我将无法旋转up new consumer并行处理100条消息,因为已经没有消息留给新consumer了(都被老consumer拿走了,虽然大部分消息没有处理)。有没有办法让消费者一次只能接受1条消息?
您需要指定您的频道所需的Quality of Service。
在您的情况下,prefetch_count
是您需要的参数。
import pika
params = pika.URLParameters([URL])
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)