如何同时处理多个 websocket 消息?
How to handle multiple websocket messages at the same time?
如果有人可以在 Python 和 async/await 方面帮助我,我们将不胜感激!
我需要监听 websocket 的消息,所以我设置了以下代码:
import websockets
import asyncio
my_socket = "ws://......."
# I set a "while True" here to reconnect websocket if it stop for any reason
while True:
try:
async with websockets.connect(my_socket) as ws:
# I set a "while True" here to keep listening to messages forever
while True:
await on_message(await ws.recv())
# If websocket gets closed for any reason, we catch exception and wait before new loop
except Exception as e:
print(e)
# Wait 10 secs before new loop to avoid flooding server if it is unavailable for any reason
await asyncio.sleep(10)
async def on_message(message):
# Do what needs to be done with received message
# This function is running for a few minutes, with a lot of sleep() time in it..
# .. so it does no hold process for itself
我想做的是:
- 收听消息
- 收到消息后,立即使用
on_message()
功能应用各种操作,持续几分钟
- 在
on_message()
处理之前的消息时继续收听消息
实际发生了什么:
- 收听消息
- 收到消息并启动
on_message()
功能
- 然后程序正在等待
on_message()
函数结束,然后再接收任何新消息,这需要几分钟,并使第二条消息延迟,依此类推
我明白它为什么这样做,因为 await on_message()
清楚地说:等待 on_message() 结束,这样它就不会返回以收听新消息。我不知道的是,我如何处理消息而不必等待此函数结束。
我的 on_message()
函数有很多空闲时间和一些 await asyncio.sleep(1)
,所以我知道我可以 运行 同时执行多项任务。
那么,我如何才能在 运行 处理第一条消息的同时继续收听新消息?
总之,你需要把await on_message(await ws.recv())
改成asyncio.create_task(on_message(await ws.recv()))
。
正如您正确指出的那样,await
对您不起作用,因为它意味着等待任务完成。虽然代码是异步的,但从某种意义上说,它是由事件循环驱动的,并且您可以并行启动多个此类任务,但每个单独的循环都是顺序的。
await
的替代方法是使用 asyncio.create_task()
在后台生成作业。这将创建一个任务,该任务将分块执行协程(每个块在两个等待暂停之间)散布在其他活动协程的等效块中。 create_task()
将 return 任务的句柄,您可以(并且可能在某些时候应该) await 等待任务完成并获取其结果或异常。因为在你的情况下你不关心结果,你甚至不需要存储任务。
如果有人可以在 Python 和 async/await 方面帮助我,我们将不胜感激!
我需要监听 websocket 的消息,所以我设置了以下代码:
import websockets
import asyncio
my_socket = "ws://......."
# I set a "while True" here to reconnect websocket if it stop for any reason
while True:
try:
async with websockets.connect(my_socket) as ws:
# I set a "while True" here to keep listening to messages forever
while True:
await on_message(await ws.recv())
# If websocket gets closed for any reason, we catch exception and wait before new loop
except Exception as e:
print(e)
# Wait 10 secs before new loop to avoid flooding server if it is unavailable for any reason
await asyncio.sleep(10)
async def on_message(message):
# Do what needs to be done with received message
# This function is running for a few minutes, with a lot of sleep() time in it..
# .. so it does no hold process for itself
我想做的是:
- 收听消息
- 收到消息后,立即使用
on_message()
功能应用各种操作,持续几分钟 - 在
on_message()
处理之前的消息时继续收听消息
实际发生了什么:
- 收听消息
- 收到消息并启动
on_message()
功能 - 然后程序正在等待
on_message()
函数结束,然后再接收任何新消息,这需要几分钟,并使第二条消息延迟,依此类推
我明白它为什么这样做,因为 await on_message()
清楚地说:等待 on_message() 结束,这样它就不会返回以收听新消息。我不知道的是,我如何处理消息而不必等待此函数结束。
我的 on_message()
函数有很多空闲时间和一些 await asyncio.sleep(1)
,所以我知道我可以 运行 同时执行多项任务。
那么,我如何才能在 运行 处理第一条消息的同时继续收听新消息?
总之,你需要把await on_message(await ws.recv())
改成asyncio.create_task(on_message(await ws.recv()))
。
正如您正确指出的那样,await
对您不起作用,因为它意味着等待任务完成。虽然代码是异步的,但从某种意义上说,它是由事件循环驱动的,并且您可以并行启动多个此类任务,但每个单独的循环都是顺序的。
await
的替代方法是使用 asyncio.create_task()
在后台生成作业。这将创建一个任务,该任务将分块执行协程(每个块在两个等待暂停之间)散布在其他活动协程的等效块中。 create_task()
将 return 任务的句柄,您可以(并且可能在某些时候应该) await 等待任务完成并获取其结果或异常。因为在你的情况下你不关心结果,你甚至不需要存储任务。