从线程消耗的鼠兔忽略了 KeyboardInterrupt
pika consuming from thread ignores KeyboardInterrupt
我正在尝试 运行 使用来自 github 的 pika 0.10.0 进行以下测试:
import logging
import sys
import pika
import threading
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
URL = 'amqp://guest:guest@127.0.0.1:5672/%2F?socket_timeout=0.25'
class BaseConsumer(threading.Thread):
def run(self):
self._connection = None
self._channel = None
self.connect()
self.open_channel()
self.consume()
def connect(self):
parameters = pika.URLParameters(URL)
self._connection = pika.BlockingConnection(parameters)
def open_channel(self):
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange='exc1', exchange_type='topic', passive=False,
durable=False, auto_delete=False, internal=False, arguments=None)
self._channel.queue_declare(queue='test', passive=False, durable=False,
exclusive=False, auto_delete=False, arguments=None)
self._channel.queue_bind(
'test', 'exc1', routing_key='rk', arguments=None)
def consume(self):
self._channel.basic_consume(self.on_message, 'test')
try:
self._channel.start_consuming()
except KeyboardInterrupt:
logging.info("Stop consuming now!")
self._channel.stop_consuming()
self._connection.close()
def on_message(self, channel, method_frame, header_frame, body):
print method_frame.delivery_tag
print body
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
c1 = BaseConsumer()
c1.setDaemon(False)
c1.start()
脚本正在连接到我的 MQ,并且显然能够使用来自 MQ 的消息。问题是我无法停止线程。在我的键盘上按 CTRL-C 只会导致“^C”出现在控制台中而不会中断消费。
问题是,如何让 pika 在线程中 运行ning 时停止消耗?我想指出,我正在遵循在消费者线程中创建连接的准则。
如果在使用 c1.start() 启动线程后,我还执行无限 while 循环并从那里记录一些内容,然后按 CTRL-C 将结束 while 循环,但消费者线程仍将忽略任何额外的 CTRL -C.
附带问题:是否可以通过 threading.Condition 之类的外部信号停止线程内部的消费?我看不出如何干扰 start_consuming.
Question: ... from there then pressing CTRL-C will end the while loop
将 def stop()
添加到您的 BaseConsumer
,
抓住 KeyboardInterrupt
并调用 stop()
.
try:
BaseConsumer.run()
except KeyboardInterrupt:
BaseConsumer.stop()
我正在尝试 运行 使用来自 github 的 pika 0.10.0 进行以下测试:
import logging
import sys
import pika
import threading
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
URL = 'amqp://guest:guest@127.0.0.1:5672/%2F?socket_timeout=0.25'
class BaseConsumer(threading.Thread):
def run(self):
self._connection = None
self._channel = None
self.connect()
self.open_channel()
self.consume()
def connect(self):
parameters = pika.URLParameters(URL)
self._connection = pika.BlockingConnection(parameters)
def open_channel(self):
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange='exc1', exchange_type='topic', passive=False,
durable=False, auto_delete=False, internal=False, arguments=None)
self._channel.queue_declare(queue='test', passive=False, durable=False,
exclusive=False, auto_delete=False, arguments=None)
self._channel.queue_bind(
'test', 'exc1', routing_key='rk', arguments=None)
def consume(self):
self._channel.basic_consume(self.on_message, 'test')
try:
self._channel.start_consuming()
except KeyboardInterrupt:
logging.info("Stop consuming now!")
self._channel.stop_consuming()
self._connection.close()
def on_message(self, channel, method_frame, header_frame, body):
print method_frame.delivery_tag
print body
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
c1 = BaseConsumer()
c1.setDaemon(False)
c1.start()
脚本正在连接到我的 MQ,并且显然能够使用来自 MQ 的消息。问题是我无法停止线程。在我的键盘上按 CTRL-C 只会导致“^C”出现在控制台中而不会中断消费。
问题是,如何让 pika 在线程中 运行ning 时停止消耗?我想指出,我正在遵循在消费者线程中创建连接的准则。
如果在使用 c1.start() 启动线程后,我还执行无限 while 循环并从那里记录一些内容,然后按 CTRL-C 将结束 while 循环,但消费者线程仍将忽略任何额外的 CTRL -C.
附带问题:是否可以通过 threading.Condition 之类的外部信号停止线程内部的消费?我看不出如何干扰 start_consuming.
Question: ... from there then pressing CTRL-C will end the while loop
将 def stop()
添加到您的 BaseConsumer
,
抓住 KeyboardInterrupt
并调用 stop()
.
try:
BaseConsumer.run()
except KeyboardInterrupt:
BaseConsumer.stop()