在多个异步循环中使用相同的 websocket 连接 (Python)
Use the same websocket connection in multiple asynchronous loops (Python)
我正在 运行 异步设置两个循环,并希望两者都可以访问同一个 websocket 连接。一个函数 periodic_fetch()
定期(每 60 秒)获取一些数据,并在满足条件时向 websocket 发送一条消息。另一个 retrieve_websocket()
从 websocket 接收消息并在满足条件时执行某些操作。截至目前,我在两个函数中都连接到 websocket,但这意味着 retrieve_websocket()
将不会收到对 periodic_fetch()
发送的 websocket 消息的响应。如何创建一个 websocket 连接并在两个循环中使用相同的连接,因为它们 运行 是异步的?我的代码:
# Imports
import asyncio
import websockets
from datetime import datetime
websocket_url = "wss://localhost:5000/"
# Simulate fetching some data
async def fetch_data():
print("Fetching started")
await asyncio.sleep(2)
return {"data": 2}
# Receive and analyze websocket data
async def retrieve_websocket():
async with websockets.connect(websocket_url) as ws:
while True:
msg = await ws.recv()
print(msg)
# Perform some task if condition is met
# Periodically fetch data and send messages to websocket
async def periodic_fetch():
async with websockets.connect(websocket_url) as ws:
while True:
print(datetime.now())
fetch_task = asyncio.create_task(fetch_data())
wait_task = asyncio.create_task(asyncio.sleep(60))
res = await fetch_task
# Send message to websocket
await ws.send("Websocket message")
# Wait the remaining wait duration
await wait_task
loop = asyncio.get_event_loop()
cors = asyncio.wait([periodic_fetch(), retrieve_websocket()])
loop.run_until_complete(cors)
解决方案是在一个单独的函数中打开连接并使用 asyncio.gather()
以 websocket 作为参数传入两个函数。
async def run_script():
async with websockets.connect(websocket_url) as ws:
await asyncio.gather(periodic_fetch(ws), retrieve_websocket(ws))
asyncio.run(run_script())
我正在 运行 异步设置两个循环,并希望两者都可以访问同一个 websocket 连接。一个函数 periodic_fetch()
定期(每 60 秒)获取一些数据,并在满足条件时向 websocket 发送一条消息。另一个 retrieve_websocket()
从 websocket 接收消息并在满足条件时执行某些操作。截至目前,我在两个函数中都连接到 websocket,但这意味着 retrieve_websocket()
将不会收到对 periodic_fetch()
发送的 websocket 消息的响应。如何创建一个 websocket 连接并在两个循环中使用相同的连接,因为它们 运行 是异步的?我的代码:
# Imports
import asyncio
import websockets
from datetime import datetime
websocket_url = "wss://localhost:5000/"
# Simulate fetching some data
async def fetch_data():
print("Fetching started")
await asyncio.sleep(2)
return {"data": 2}
# Receive and analyze websocket data
async def retrieve_websocket():
async with websockets.connect(websocket_url) as ws:
while True:
msg = await ws.recv()
print(msg)
# Perform some task if condition is met
# Periodically fetch data and send messages to websocket
async def periodic_fetch():
async with websockets.connect(websocket_url) as ws:
while True:
print(datetime.now())
fetch_task = asyncio.create_task(fetch_data())
wait_task = asyncio.create_task(asyncio.sleep(60))
res = await fetch_task
# Send message to websocket
await ws.send("Websocket message")
# Wait the remaining wait duration
await wait_task
loop = asyncio.get_event_loop()
cors = asyncio.wait([periodic_fetch(), retrieve_websocket()])
loop.run_until_complete(cors)
解决方案是在一个单独的函数中打开连接并使用 asyncio.gather()
以 websocket 作为参数传入两个函数。
async def run_script():
async with websockets.connect(websocket_url) as ws:
await asyncio.gather(periodic_fetch(ws), retrieve_websocket(ws))
asyncio.run(run_script())