线程中的 Pika 异步发布者
Pika asynchronous publisher in a thread
我关注了 pika 的 Asynchronous publisher example 并试图 运行 它的 self._connection.ioloop.start()
在一个单独的线程中。到目前为止,我设法为主线程使用队列来添加要发布的消息。但是让发布者线程从队列中获取消息的唯一方法并不令人满意。我用了类似
的东西
try:
message = self._queue.get(True, 1)
self._channel.basic_publish(body=message, exchange=self._exchange, routing_key='example.text')
except queue.Empty:
pass
finally:
self._connection.add_timeout(0.0001, self.publish_message)
一定有更好的方法来做到这一点,对吧?重要的是要注意我在 Windows 中使用 Python 3.6.4 并且 pika.SelectConnection
选择的 IO 循环似乎非常有限...
编辑:我刚刚发现如何使用 adapters.AsyncioConnection
而不是 SelectConnection
。所以现在我可以用 self._connection.loop.call_soon(self.publish_message)
替换 self._connection.add_timeout(0.0001, self.publish_message)
。
这会产生非常奇怪的结果:消息似乎每秒都会缓冲和发送一次。我是 Python 的新手,所以非常感谢您提供一些见解!
这个问题的正确答案,如果它仍然出现在搜索结果中,至少升级到 Pika v0.12 并利用方法 add_callback_threadsafe 可用于各种连接适配器。 Here 是一个例子。
我关注了 pika 的 Asynchronous publisher example 并试图 运行 它的 self._connection.ioloop.start()
在一个单独的线程中。到目前为止,我设法为主线程使用队列来添加要发布的消息。但是让发布者线程从队列中获取消息的唯一方法并不令人满意。我用了类似
try:
message = self._queue.get(True, 1)
self._channel.basic_publish(body=message, exchange=self._exchange, routing_key='example.text')
except queue.Empty:
pass
finally:
self._connection.add_timeout(0.0001, self.publish_message)
一定有更好的方法来做到这一点,对吧?重要的是要注意我在 Windows 中使用 Python 3.6.4 并且 pika.SelectConnection
选择的 IO 循环似乎非常有限...
编辑:我刚刚发现如何使用 adapters.AsyncioConnection
而不是 SelectConnection
。所以现在我可以用 self._connection.loop.call_soon(self.publish_message)
替换 self._connection.add_timeout(0.0001, self.publish_message)
。
这会产生非常奇怪的结果:消息似乎每秒都会缓冲和发送一次。我是 Python 的新手,所以非常感谢您提供一些见解!
这个问题的正确答案,如果它仍然出现在搜索结果中,至少升级到 Pika v0.12 并利用方法 add_callback_threadsafe 可用于各种连接适配器。 Here 是一个例子。