如何在线程中使用 websockets 服务器并正确关闭线程并关闭事件循环?

How to use websockets server in a thread and properly close the thread and shut down event loop?

我的设计在这里可能是错误的,但我还是会问的。目前,我正在使用 websockets 创建一个 websocket 服务器。我在我的程序中执行很多其他工作,并且希望 websocket 服务器在它自己的线程中。我创建了一个 class 子 class es threading.Thread 并覆盖了 运行 方法。这个 class 的停止方法是我关闭 websocket 服务器,并停止并关闭我创建的事件循环的地方。

class MyWsServer(threading.Thread):
    def __init__(self, address, port):
        threading.Thread.__init__(self)
        self.port = port
        self.address = address
        self.server = None
        self.running = False
        self.loop = None

    def start_ws_server(self):
        self.start()

    def run(self):
        if not self.loop:
            self.loop = asyncio.new_event_loop()
        ws_server = websockets.serve(self.ws_handler, self.address, self.port,
                                     ping_timeout=None, ping_interval=None, loop=self.loop)
        self.running = True
        self.server = ws_server
        self.loop.run_until_complete(self.server)
        self.loop.run_forever()

    def stop_ws_server(self):
        self.running = False
        self.server.ws_server.close()
        self.loop.stop()
        self.loop.close()

    async def ws_handler(self, websocket, path):
        while self.running:
            print(self.running)
            # simulate work
            print("doing some work")
            sleep(5)
            print("Sending data")
            data = json.dumps({"test": "test test"})
            try:
                await websocket.send(data)
                result = await websocket.recv()
                print(result)
                result = json.loads(result)
                print(f"json: {result}")
            except websockets.ConnectionClosed:
                print(f"Terminated")
                break
        print("out of ws_handler")

此 class 实例可在程序的其他地方访问,并且在调用 stop() 方法时,我收到有关 self.loop.close() 的以下错误。堆栈跟踪中没有什么值得一提的。

  File "C:\Users\User\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 674, in close
    raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop

所以我的问题是:

  1. 为什么 self.loop.stop() 不停止事件循环?
  2. 是否有更好的解决方案来停止事件循环和最终目标,停止线程?

另外请注意,我不认为在 stop_ws_server() 中将 self.running 设置为 False 有任何作用,因为该线程大部分时间应该停留在 result = await websocket.recv() .我似乎没有退出 websocket 处理程序的干净方法。 stop_ws_server() 中的最后三行对我来说似乎相当暴力。

我还在 Stack Overflow 上看到一些 post 和关于 asyncio 对象,它们不是线程安全的,还有很多 post 推荐对 asyncio 的某种用法 loop.call_soon_threadsafe()。不过,我不确定如何在当前设置中使用它。

嗯,非常感谢您的帮助。谢谢。如果事情需要更多说明,请告诉我。

嗯,这对我来说似乎很老套,但我从 Stack Overflow 上的另一个解决方案中获得了指导。这里是使用的答案,。新代码如下。我也从以前的代码中删除了一些行,例如现在未使用的变量和打印语句。

class MyWsServer(Process):
    def __init__(self, address, port):
        super().__init__()
        self.port = port
        self.address = address

    def run(self):
        loop = asyncio.new_event_loop()
        ws_server = websockets.serve(self.ws_handler, self.address, self.port,
                                     ping_timeout=None, ping_interval=None, loop=loop)
        loop.run_until_complete(ws_server)
        loop.run_forever()

    async def ws_handler(self, websocket, path):
        while True:
            # simulate work
            print("doing some work")
            sleep(5)
            print("Sending data")
            data = json.dumps({"test": "test test"})
            try:
                await websocket.send(data)
                result = await websocket.recv()
                print(result)
                result = json.loads(result)
                print(f"json: {result}")
            except websockets.ConnectionClosed:
                print(f"Terminated")
                break

为了启动该进程,我在我在代码其他地方创建的 WsListener class 实例上调用 start() 并调用 terminate() 终止该进程。我希望有一种更优雅的方式来终止事件循环和进程中创建的其他资源,但我还没有找到更好的方式。

编辑:忘了说我现在正在使用 Multiprocessing