如何打破没有任何传入消息的(asyncio)websocket 获取循环?
How to break out of an (asyncio) websocket fetch loop that doesn't have any incoming messages?
此代码打印来自 websocket 连接的所有消息:
class OrderStreamer:
def __init__(ᬑ):
ᬑ.terminate_flag = False
# worker thread to receive data stream
ᬑ.worker_thread = threading.Thread(
target=ᬑ.worker_thread_func,
daemon=True
)
def start_streaming(ᬑ, from_scheduler = False):
ᬑ.worker_thread.start()
def terminate(ᬑ):
ᬑ.terminate_flag = True
def worker_thread_func(ᬑ):
asyncio.run(ᬑ.aio_func()) # blocks
async def aio_func(ᬑ):
async with \
aiohttp.ClientSession() as session, \
session.ws_connect(streams_url) as wsock, \
anyio.create_task_group() as tg:
async for msg in wsock:
print(msg.data)
if ᬑ.terminate_flag:
await wsock.close()
问题是如果没有消息到达,循环永远不会有机会检查 terminate_flag
并且永远不会退出。
我尝试创建对 runloop 和 websocket 的外部引用:
async with \
aiohttp.ClientSession() as session, \
session.ws_connect(streams_url) as wsock, \
anyio.create_task_group() as tg:
ᬑ.wsock = wsock
ᬑ.loop = asyncio.get_event_loop()
...并修改我的终止函数:
def terminate(ᬑ):
# ᬑ.loop.stop()
asyncio.set_event_loop(ᬑ.loop)
async def kill():
await ᬑ.wsock.close()
asyncio.run(kill())
...但它不起作用。
此时我无法重新构建我的整个应用程序以使用 asyncio。
如何跳出循环?
您应该使用 asyncio.wait_for
或 asyncio.wait
并直接调用 wsock.__anext__()
而不是使用 async for
循环。
asyncio.wait
的循环应该是这样的:
next_message = asyncio.create_task(wsock.__anext__())
while not self.terminate_flag:
await asyncio.wait([next_message], timeout=SOME_TIMEOUT,)
if next_message.done():
try:
msg = next_message.result()
except StopAsyncIteration:
break
else:
print(msg.data)
next_message = asyncio.create_task(wsock.__anext__())
SOME_TIMEOUT
应替换为您希望连续等待下一条传入消息的秒数
Here 是 asyncio.wait
的文档
P.S。我用 self
替换了 ᬑ
,但我希望你能理解
请注意,要读取数据,您不应创建新任务,如前所述here:
Reading from the WebSocket (await ws.receive()) must only be done inside the request handler task;
你可以简单地使用超时。
async def handler(request):
ws = web.WebSocketResponse() # or web.WebSocketResponse(receive_timeout=5)
await ws.prepare(request)
while True:
try:
msg = await ws.receive(timeout=5)
except asyncio.TimeoutError:
print('TimeoutError')
if your_terminate_flag is True:
break
aiohttp/web_protocol.py/_handle_request()
如果您不写 try/except 或没有捕获正确的异常,将转储错误。尝试测试 except Exception as err:
或检查其源代码。
此代码打印来自 websocket 连接的所有消息:
class OrderStreamer:
def __init__(ᬑ):
ᬑ.terminate_flag = False
# worker thread to receive data stream
ᬑ.worker_thread = threading.Thread(
target=ᬑ.worker_thread_func,
daemon=True
)
def start_streaming(ᬑ, from_scheduler = False):
ᬑ.worker_thread.start()
def terminate(ᬑ):
ᬑ.terminate_flag = True
def worker_thread_func(ᬑ):
asyncio.run(ᬑ.aio_func()) # blocks
async def aio_func(ᬑ):
async with \
aiohttp.ClientSession() as session, \
session.ws_connect(streams_url) as wsock, \
anyio.create_task_group() as tg:
async for msg in wsock:
print(msg.data)
if ᬑ.terminate_flag:
await wsock.close()
问题是如果没有消息到达,循环永远不会有机会检查 terminate_flag
并且永远不会退出。
我尝试创建对 runloop 和 websocket 的外部引用:
async with \
aiohttp.ClientSession() as session, \
session.ws_connect(streams_url) as wsock, \
anyio.create_task_group() as tg:
ᬑ.wsock = wsock
ᬑ.loop = asyncio.get_event_loop()
...并修改我的终止函数:
def terminate(ᬑ):
# ᬑ.loop.stop()
asyncio.set_event_loop(ᬑ.loop)
async def kill():
await ᬑ.wsock.close()
asyncio.run(kill())
...但它不起作用。
此时我无法重新构建我的整个应用程序以使用 asyncio。
如何跳出循环?
您应该使用 asyncio.wait_for
或 asyncio.wait
并直接调用 wsock.__anext__()
而不是使用 async for
循环。
asyncio.wait
的循环应该是这样的:
next_message = asyncio.create_task(wsock.__anext__())
while not self.terminate_flag:
await asyncio.wait([next_message], timeout=SOME_TIMEOUT,)
if next_message.done():
try:
msg = next_message.result()
except StopAsyncIteration:
break
else:
print(msg.data)
next_message = asyncio.create_task(wsock.__anext__())
SOME_TIMEOUT
应替换为您希望连续等待下一条传入消息的秒数
Here 是 asyncio.wait
P.S。我用 self
替换了 ᬑ
,但我希望你能理解
请注意,要读取数据,您不应创建新任务,如前所述here:
Reading from the WebSocket (await ws.receive()) must only be done inside the request handler task;
你可以简单地使用超时。
async def handler(request):
ws = web.WebSocketResponse() # or web.WebSocketResponse(receive_timeout=5)
await ws.prepare(request)
while True:
try:
msg = await ws.receive(timeout=5)
except asyncio.TimeoutError:
print('TimeoutError')
if your_terminate_flag is True:
break
aiohttp/web_protocol.py/_handle_request()
如果您不写 try/except 或没有捕获正确的异常,将转储错误。尝试测试 except Exception as err:
或检查其源代码。