在 Python FastAPI 中使用 websockets 并行发送/接收

Send / receive in parallel using websockets in Python FastAPI

我会尝试用一个例子来解释我在做什么,比如说我正在构建一个天气客户端。浏览器通过websocket发送消息,例如:

{
  "city": "Chicago",
  "country": "US"
}

服务器每 5 分钟查询一次天气,并用最新数据更新浏览器。

现在浏览器可以发送另一条消息,例如:

{
  "city": "Bangalore",
  "country": "IN"
}

现在我的服务器应该停止更新芝加哥的天气详细信息并开始更新有关班加罗尔的详细信息,即同时通过 websocket 发送/接收消息。我应该如何实施它?

目前我有这个,但这只会在收到事件时更新浏览器:

@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    weather_client = WeatherClient(client)
    while True:
        data = await websocket.receive_json()
        weather = await weather_client.weather(data)
        await websocket.send_json(weather.dict())

如果我将 websocket.receive_json() 移动到循环之外,我将无法继续收听来自浏览器的消息。我想我需要启动两个 asyncio 任务,但由于我是异步编程方式的新手,所以我不太能够确定实现。

执行此操作的最简单方法就像您提到的在单独的任务中将读数移到循环之外。在此范例中,您需要使用最新数据更新局部变量,使您的代码看起来像这样:

@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    json_data = await websocket.receive_json()

    async def read_from_socket(websocket: WebSocket):
        nonlocal json_data
        async for data in websocket.iter_json():
            json_data = data

    asyncio.create_task(read_from_socket(websocket))
    while True:
        print(f"getting weather data for {json_data}")
        await asyncio.sleep(1)  # simulate a slow call to the weather service

请注意,我使用了 iter_json 异步生成器,这相当于 receive_json 上的无限循环。

这可以工作,但根据您的要求可能会有错误。想象一下,天气服务需要 10 秒才能完成,在这段时间内,用户通过套接字发送了三个针对不同城市的请求。在上面的代码中,您只会获得用户发送的最新城市。这对您的应用程序可能没问题,但如果您需要跟踪用户发送的所有内容,则需要使用队列。在此范例中,您将有一个任务读取数据并将其放入队列,还有一个任务从队列中获取数据并查询天气服务。然后,您将 运行 这些与 gather 同时进行。

@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    queue = asyncio.queues.Queue()

    async def read_from_socket(websocket: WebSocket):
        async for data in websocket.iter_json():
            print(f"putting {data} in the queue")
            queue.put_nowait(data)

    async def get_data_and_send():
        data = await queue.get()
        while True:
            if queue.empty():
                print(f"getting weather data for {data}")
                await asyncio.sleep(1)
            else:
                data = queue.get_nowait()
                print(f"Setting data to {data}")

    await asyncio.gather(read_from_socket(websocket), get_data_and_send())

这样,您就不会丢失用户发送的数据。在上面的示例中,我只获取最新用户请求的天气数据,但您仍然可以访问所有发送的数据。

编辑:为了在评论中回答您的问题,队列方法可能是在新请求进入时取消任务的最佳方法。基本上将您希望能够取消的长运行宁任务移动到它自己的协程函数(在这个例子中 read_and_send_to_client)和 运行 它作为一个任务。当有新数据进来时,如果那个任务没有完成,取消它,然后创建一个新的。

async def read_and_send_to_client(data):
    print(f'reading {data} from client')
    await asyncio.sleep(10) # simulate a slow call
    print(f'finished reading {data}, sending to websocket client')


@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    queue = asyncio.queues.Queue()

    async def read_from_socket(websocket: WebSocket):
        async for data in websocket.iter_json():
            print(f"putting {data} in the queue")
            queue.put_nowait(data)

    async def get_data_and_send():
        data = await queue.get()
        fetch_task = asyncio.create_task(read_and_send_to_client(data))
        while True:
            data = await queue.get()
            if not fetch_task.done():
                print(f'Got new data while task not complete, canceling.')
                fetch_task.cancel()
            fetch_task = asyncio.create_task(read_and_send_to_client(data))

    await asyncio.gather(read_from_socket(websocket), get_data_and_send())