robust_connection 来自 aio_pika 未重新连接
robust_connection from aio_pika not reconnecting
我有以下使用 asyncio 和 iohttp 的应用程序。在应用程序启动时,我启动了 Web 界面 (iohttp) 和 aio_pika 消费者。到目前为止,一切都对我有用,唯一的问题是 robut 连接不会尝试重新连接。当我重新启动我的兔子 docker 图像时,我收到此日志消息:
[2019-11-21 14:05:13,069] INFO: Connection to amqp://guest:******@127.0.0.1/ closed. Reconnecting after 5 seconds.
但是我收到以下异常并且没有重新连接:
File "/usr/lib64/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
Traceback (most recent call last):
File "/home/ghovat/.local/share/virtualenvs/zendesk-wrapper-BdIlfSJk/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 148, in reconnect
await self.connect()
File "/home/ghovat/.local/share/virtualenvs/zendesk-wrapper-BdIlfSJk/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 134, in connect
await self.__cleanup_connection(e)
File "/home/ghovat/.local/share/virtualenvs/zendesk-wrapper-BdIlfSJk/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 96, in __cleanup_connection
self.connection.close(exc),
AttributeError: 'NoneType' object has no attribute 'close'
健壮的连接不应该再尝试重连吗? connection reset by peer
的问题似乎是兔子启动需要几秒钟。
这是我启动应用程序的方式:
def loop_in_thread(loop):
asyncio.set_event_loop(loop)
connection = loop.run_until_complete(main(loop))
try:
loop.run_forever()
finally:
loop.run_until_complete(connection.close())
if __name__ == "__main__":
app = setup_app()
loop = asyncio.new_event_loop()
t = threading.Thread(target=loop_in_thread, args=(loop,))
t.start()
web.run_app(app)
这是我的消费者的样子:
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
print("jdklasjkl")
async with connection:
# Creating channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=False
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
确保更新版本 6.4.1,这个 pr 应该可以解决这个问题 - https://github.com/mosquito/aio-pika/pull/267
我有以下使用 asyncio 和 iohttp 的应用程序。在应用程序启动时,我启动了 Web 界面 (iohttp) 和 aio_pika 消费者。到目前为止,一切都对我有用,唯一的问题是 robut 连接不会尝试重新连接。当我重新启动我的兔子 docker 图像时,我收到此日志消息:
[2019-11-21 14:05:13,069] INFO: Connection to amqp://guest:******@127.0.0.1/ closed. Reconnecting after 5 seconds.
但是我收到以下异常并且没有重新连接:
File "/usr/lib64/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
Traceback (most recent call last):
File "/home/ghovat/.local/share/virtualenvs/zendesk-wrapper-BdIlfSJk/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 148, in reconnect
await self.connect()
File "/home/ghovat/.local/share/virtualenvs/zendesk-wrapper-BdIlfSJk/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 134, in connect
await self.__cleanup_connection(e)
File "/home/ghovat/.local/share/virtualenvs/zendesk-wrapper-BdIlfSJk/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 96, in __cleanup_connection
self.connection.close(exc),
AttributeError: 'NoneType' object has no attribute 'close'
健壮的连接不应该再尝试重连吗? connection reset by peer
的问题似乎是兔子启动需要几秒钟。
这是我启动应用程序的方式:
def loop_in_thread(loop):
asyncio.set_event_loop(loop)
connection = loop.run_until_complete(main(loop))
try:
loop.run_forever()
finally:
loop.run_until_complete(connection.close())
if __name__ == "__main__":
app = setup_app()
loop = asyncio.new_event_loop()
t = threading.Thread(target=loop_in_thread, args=(loop,))
t.start()
web.run_app(app)
这是我的消费者的样子:
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
print("jdklasjkl")
async with connection:
# Creating channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=False
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
确保更新版本 6.4.1,这个 pr 应该可以解决这个问题 - https://github.com/mosquito/aio-pika/pull/267