为传入请求保持 Websockets 连接打开

Keep Websockets connection open for incoming requests

我有一个接受来自客户端的 HTTP 请求的 Flask 服务器。此 HTTP 服务器需要使用 websocket 连接将工作委托给第三方服务器(出于性能原因)。

我发现很难全神贯注于如何创建可以为 HTTP 请求保持打开状态的永久 websocket 连接。在 运行-once 脚本中向 websocket 服务器发送请求工作正常,看起来像这样:

async def send(websocket, payload):
    await websocket.send(json.dumps(payload).encode("utf-8"))

async def recv(websocket):
    data = await websocket.recv()
    return json.loads(data)

async def main(payload):
    uri = f"wss://the-third-party-server.com/xyz"
    async with websockets.connect(uri) as websocket:
        future = send(websocket, payload)
        future_r = recv(websocket)
        _, output = await asyncio.gather(future, future_r)
    return output

asyncio.get_event_loop().run_until_complete(main({...}))

在这里,main() 建立了一个 WSS 连接并在完成后将其关闭,但是我如何才能为传入的 HTTP 请求保持该连接打开,这样我就可以为每个请求调用 main() 而无需重新正在建立 WSS 连接?

这里的主要问题是,当您编写响应 http(s) 的 Web 应用程序时,您的代码有一个非常特殊的“生命周期”:通常您有一个“查看”功能,可以获取请求数据,执行收集响应数据和 return 所需的所有操作。

大多数网络框架中的这个“视图”功能必须独立于系统的其余部分——它应该能够履行其职责,而不依赖于它在调用时获得的数据或对象——这是请求数据和系统配置 - 使应用程序服务器(旨在将您的程序实际连接到 Internet 的框架部分)可以选择多种方式来为您的程序提供服务:它们可能 运行 您的视图功能在多个并行线程,或者在多个并行进程中,甚至在各种容器或物理服务器中的不同进程中:您的应用程序不需要关心这些。

如果您想要在 调用您的视图函数时可用的资源,您需要打破这种范式。例如,通常情况下,框架会希望创建一个数据库连接池,以便同一进程上的视图可以重用这些连接。这些数据库连接通常由框架本身提供,该框架实现了一种允许重用的机制,并根据需要以透明的方式提供。如果您想保持 websocket 连接有效,您必须重新创建 一种相同类型的机制。

在某种程度上,您需要一个 Python 对象来调解您的 websocket 数据,就像您的网络视图功能的“服务器”一样。

这比听起来更简单 - 一个特殊的 Python class 设计用于每个进程有一个实例,它保持连接,并且能够发送和接收从无需处理的并行调用就足够了。确保此实例存在于当前进程中的可调用函数足以在配置为将您的应用程序提供给网络的任何策略下工作。

如果您使用的是 Flask,它不使用 asyncio,您会变得更加复杂 - 您将失去视图中的异步能力,他们将不得不等待 websocket 请求完成 - 然后它会成为您的应用程序服务器的工作,让您在不同的线程或进程中查看以确保可用性。而且,你的工作是在一个单独的线程中为你的 websocket 运行 设置异步循环,这样它就可以发出它需要的请求。

这是一些示例代码。 请注意,除了每个进程使用一个 websocket 之外, 这没有任何形式的失败的规定,但是, 最重要的是:它不并行执行任何操作:所有 成对的 send-recv 正在阻塞,因为你没有给出任何线索 一种允许将每个传出消息配对的机制 及其回应。

import asyncio
import threading
from queue import Queue
 

class AWebSocket:
    instance = None
    def __new__(cls, *args, **kw):
        if cls.instance:
            return cls.instance
        return super().__new__(cls, *args, **kw)

    def __init__(self, *args, **kw):
        cls = self.__class__
        if cls.instance:
            # init will be called even if new finds the existing instance,
            # so we have to check again
            return 
        self.outgoing = Queue()
        self.responses = Queue()
        self.socket_thread = threading.Thread(target=self.start_socket)
        self.socket_thread.start()


    def start_socket():
        # starts an async loop in a separate thread, and keep
        # the web socket running, in this separate thread
        asyncio.get_event_loop().run_until_complete(self.core())

    def core(self):
        self.socket = websockets.connect(uri)

    async def _send(self, websocket, payload):
        await websocket.send(json.dumps(payload).encode("utf-8"))

    async def _recv(self, websocket):
        data = await websocket.recv()
        return json.loads(data)

    async def core(self):
        uri = f"wss://the-third-party-server.com/xyz"
        async with websockets.connect(uri) as websocket:
            self.websocket = websocket
            while True:
                # This code is as you wrote it: 
                # it essentially blocks until a message is sent
                # and the answer is received back. 
                # You have to have a mechanism in your websocket
                # messages allowing you to identify the corresponding
                # answer to each request. On doing so, this is trivially
                # paralellizable simply by calling asyncio.create_task 
                # instead of awaiting on asyncio.gather
                payload = self.outgoing.get()
                future = self._send(websocket, payload)
                future_r = self._recv(websocket)
                _, response = await asyncio.gather(future, future_r)
                self.responses.put(response)

    def send(self, payload):
        # This is the method you call from your views
        # simply do:
        # `output = AWebSocket().send(payload)`
        self.outgoing.put(payload)
        return self.responses.get()