如何在 pika 库上向方法 start_consuming() 添加超时
How to add a timeout to method start_consuming() on pika library
我有一个 BlockingConnection
,我遵循 the examples 鼠兔文档。但在所有这些中,开始消费消息的代码示例是:
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
(或多或少的细节)。
我必须编写很多脚本,我想一个接一个地 运行(为了 test/research 目的)。但是上面的代码要求我在每个代码中都添加 ^C。
我尝试添加一些超时 explained in the documentation,但我运气不好。例如,如果我找到一个参数,如果客户端在最后 X 秒内未使用任何消息,则设置,然后脚本完成。这在 pika lib 中可行吗?或者我必须改变方法?
如果您不想让代码阻塞,请不要使用 start_consuming
。使用 SelectConnection
或使用 consume
的 this method。您可以为传递给 consume
.
的参数添加超时
注意: RabbitMQ 团队监控 rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。
import pika
parameters = pika.ConnectionParameters(host="localhost")
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
def ack_message(channel, method):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(method.delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def callback(channel,method, properties, body):
ack_message(channel,method)
print("body",body, flush=True)
channel.basic_consume(
queue="hello", on_message_callback=callback)
channel.start_consuming()
connection.close()
我的原码是Luke Bakken的回答。
但是我稍微修改了代码。
:)
我有一个 BlockingConnection
,我遵循 the examples 鼠兔文档。但在所有这些中,开始消费消息的代码示例是:
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
(或多或少的细节)。
我必须编写很多脚本,我想一个接一个地 运行(为了 test/research 目的)。但是上面的代码要求我在每个代码中都添加 ^C。
我尝试添加一些超时 explained in the documentation,但我运气不好。例如,如果我找到一个参数,如果客户端在最后 X 秒内未使用任何消息,则设置,然后脚本完成。这在 pika lib 中可行吗?或者我必须改变方法?
如果您不想让代码阻塞,请不要使用 start_consuming
。使用 SelectConnection
或使用 consume
的 this method。您可以为传递给 consume
.
注意: RabbitMQ 团队监控 rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。
import pika
parameters = pika.ConnectionParameters(host="localhost")
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
def ack_message(channel, method):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(method.delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def callback(channel,method, properties, body):
ack_message(channel,method)
print("body",body, flush=True)
channel.basic_consume(
queue="hello", on_message_callback=callback)
channel.start_consuming()
connection.close()
我的原码是Luke Bakken的回答。
但是我稍微修改了代码。
:)