如何检测队列已被删除?
How to detect a queue has been deleted?
当我手动删除我的 PikaClient 使用的队列时,没有任何反应。我可以重新创建同名队列,但通道已停止使用队列(正常,因为我已将其删除)。但是我想在消费队列被删除时收到一个事件。
我预计频道会自动关闭,但 «on_channel_close_callback» 从未被调用。
«basic_consume» 不提供任何关闭回调。
还有一点很重要,我必须使用 TornadoConnection。
鼠兔:0.10.0
Python:2.7
龙卷风:4.3
谢谢你的帮助。
class PikaClient(object):
def __init__(self):
# init everything here
def connect(self):
pika.adapters.tornado_connection.TornadoConnection(connection_param, on_open_callback=self.on_connected)
def on_connected(self, connection):
self.logger.info('PikaClient: connected to RabbitMQ')
self.connected = True
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_open_error_callback(self, *args):
self.logger.error("on_open_error_callback")
def on_channel_open(self, channel):
channel.add_on_close_callback(self.on_channel_close_callback)
channel.basic_consume(self.on_message, queue=self.queue_name, no_ack=True)
def on_channel_close_callback(self, reply_code, reply_text):
self.logger.error("Consumer was cancelled remotely, shutting down", reply_code=reply_code, reply_text=reply_text)
我找到了解决方法。
我每隔 X 秒检查一次我的 PikaClient 是否消费了消息。如果没有,我将重新启动将自动创建队列的应用程序。
如果您有更好的解决方案,我仍然愿意提出建议。
def __init__(self):
...
self.have_messages_been_consumed = False
def on_connected(self, connection):
self.logger.info('PikaClient: connected to RabbitMQ')
self.connected = True
self.connection = connection
self.connection.add_timeout(X, self.check_if_messages_have_been_consumed)
self.connection.channel(self.on_channel_open)
def check_if_messages_have_been_consumed(self):
if self.have_messages_been_consumed:
self.have_messages_been_consumed = False
self.connection.add_timeout(X, self.check_if_messages_have_been_consumed)
else:
# close_and_restart will set to False have_messages_been_consumed
self.close_and_restart()
def on_message(self, channel, basic_deliver, header, body):
self.have_messages_been_consumed = True
...
当我手动删除我的 PikaClient 使用的队列时,没有任何反应。我可以重新创建同名队列,但通道已停止使用队列(正常,因为我已将其删除)。但是我想在消费队列被删除时收到一个事件。
我预计频道会自动关闭,但 «on_channel_close_callback» 从未被调用。 «basic_consume» 不提供任何关闭回调。 还有一点很重要,我必须使用 TornadoConnection。
鼠兔:0.10.0 Python:2.7 龙卷风:4.3
谢谢你的帮助。
class PikaClient(object):
def __init__(self):
# init everything here
def connect(self):
pika.adapters.tornado_connection.TornadoConnection(connection_param, on_open_callback=self.on_connected)
def on_connected(self, connection):
self.logger.info('PikaClient: connected to RabbitMQ')
self.connected = True
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_open_error_callback(self, *args):
self.logger.error("on_open_error_callback")
def on_channel_open(self, channel):
channel.add_on_close_callback(self.on_channel_close_callback)
channel.basic_consume(self.on_message, queue=self.queue_name, no_ack=True)
def on_channel_close_callback(self, reply_code, reply_text):
self.logger.error("Consumer was cancelled remotely, shutting down", reply_code=reply_code, reply_text=reply_text)
我找到了解决方法。 我每隔 X 秒检查一次我的 PikaClient 是否消费了消息。如果没有,我将重新启动将自动创建队列的应用程序。
如果您有更好的解决方案,我仍然愿意提出建议。
def __init__(self):
...
self.have_messages_been_consumed = False
def on_connected(self, connection):
self.logger.info('PikaClient: connected to RabbitMQ')
self.connected = True
self.connection = connection
self.connection.add_timeout(X, self.check_if_messages_have_been_consumed)
self.connection.channel(self.on_channel_open)
def check_if_messages_have_been_consumed(self):
if self.have_messages_been_consumed:
self.have_messages_been_consumed = False
self.connection.add_timeout(X, self.check_if_messages_have_been_consumed)
else:
# close_and_restart will set to False have_messages_been_consumed
self.close_and_restart()
def on_message(self, channel, basic_deliver, header, body):
self.have_messages_been_consumed = True
...