如何打破没有任何传入消息的(asyncio)websocket 获取循环?

How to break out of an (asyncio) websocket fetch loop that doesn't have any incoming messages?

此代码打印来自 websocket 连接的所有消息:

class OrderStreamer:
    def __init__(ᬑ):
        ᬑ.terminate_flag = False

        # worker thread to receive data stream
        ᬑ.worker_thread = threading.Thread(
            target=ᬑ.worker_thread_func,
            daemon=True
            )

    def start_streaming(ᬑ, from_scheduler = False):
        ᬑ.worker_thread.start()


    def terminate(ᬑ):
        ᬑ.terminate_flag = True


    def worker_thread_func(ᬑ):
        asyncio.run(ᬑ.aio_func())  # blocks


    async def aio_func(ᬑ):
        async with \
                aiohttp.ClientSession() as session, \
                session.ws_connect(streams_url) as wsock, \
                anyio.create_task_group() as tg:

            async for msg in wsock:
                print(msg.data)

                if ᬑ.terminate_flag:
                    await wsock.close()

问题是如果没有消息到达,循环永远不会有机会检查 terminate_flag 并且永远不会退出。

我尝试创建对 runloop 和 websocket 的外部引用:

        async with \
                aiohttp.ClientSession() as session, \
                session.ws_connect(streams_url) as wsock, \
                anyio.create_task_group() as tg:

            ᬑ.wsock = wsock
            ᬑ.loop = asyncio.get_event_loop()

...并修改我的终止函数:

    def terminate(ᬑ):
        # ᬑ.loop.stop()
        asyncio.set_event_loop(ᬑ.loop)

        async def kill():
            await ᬑ.wsock.close()

        asyncio.run(kill())

...但它不起作用。

此时我无法重新构建我的整个应用程序以使用 asyncio。

如何跳出循环?

您应该使用 asyncio.wait_forasyncio.wait 并直接调用 wsock.__anext__() 而不是使用 async for 循环。

asyncio.wait 的循环应该是这样的:

next_message = asyncio.create_task(wsock.__anext__())

while not self.terminate_flag:
    await asyncio.wait([next_message], timeout=SOME_TIMEOUT,)
    if next_message.done():
        try:
            msg = next_message.result()
        except StopAsyncIteration:
            break
        else:
            print(msg.data)
            next_message = asyncio.create_task(wsock.__anext__())

SOME_TIMEOUT 应替换为您希望连续等待下一条传入消息的秒数

Hereasyncio.wait

的文档

P.S。我用 self 替换了 ,但我希望你能理解

请注意,要读取数据,您不应创建新任务,如前所述here:

Reading from the WebSocket (await ws.receive()) must only be done inside the request handler task;

你可以简单地使用超时。

async def handler(request):
    ws = web.WebSocketResponse()  # or web.WebSocketResponse(receive_timeout=5)
    await ws.prepare(request)
    while True:
        try:
            msg = await ws.receive(timeout=5)
        except asyncio.TimeoutError:
            print('TimeoutError')
            if your_terminate_flag is True:
                break

aiohttp/web_protocol.py/_handle_request() 如果您不写 try/except 或没有捕获正确的异常,将转储错误。尝试测试 except Exception as err: 或检查其源代码。