在单独的线程中调用 asyncio.run() 时出现 ValueError

ValueError when asyncio.run() is called in separate thread

我有一个正在侦听多个套接字的网络应用程序。 为了单独处理每个套接字,我使用 Python 的 threading.Thread 模块。

这些套接字必须能够 运行 tasks 接收数据包,而不会延迟来自套接字处理 thread.

的任何进一步数据包接收

为此,我已经使用关键字 async 声明了 运行 前面提到的 tasks 的方法,这样我就可以 运行它们与 asyncio.run(my_async_task(my_parameters)).

异步

我已经在单个套接字(运行ning 在 main thread 上)测试了这种方法并取得了巨大成功。 但是当我使用多个套接字时(每个套接字都是独立的 handler thread),会引发以下异常:

ValueError: set_wakeup_fd only works in main thread

我的问题如下:asyncio 是满足我需要的合适工具吗?如果是,我如何从非主线程的线程中 运行 async 方法。

我的大部分搜索结果都包括 "event loops" 和 "awaiting" assync 结果,这(如果我正确理解这些结果)不是我要找的。

我在这个问题中谈论套接字以提供上下文,但我的问题主要是关于 asynciochild threads 中的行为。

如果需要,我可以编写一个简短的代码示例来重现错误。 感谢您的帮助!

Edit1,这是一个最小的可重现代码示例:

import asyncio
import threading
import time


# Handle a specific packet from any socket without interrupting the listenning thread
async def handle_it(val):
    print("handled: {}".format(val))  


# A class to simulate a threaded socket listenner
class MyFakeSocket(threading.Thread):
    def __init__(self, val):
        threading.Thread.__init__(self)
        self.val = val  # Value for a fake received packet

    def run(self):
        for i in range(10):
            # The (fake) socket will sequentially receive [val, val+1, ... val+9]
            asyncio.run(handle_it(self.val + i))
            time.sleep(0.5)


# Entry point
sockets = MyFakeSocket(0), MyFakeSocket(10)
for socket in sockets:
    socket.start()

这可能与此处讨论的错误有关:https://bugs.python.org/issue34679

如果是这样,这将是 windows 上 python 3.8 的问题。要解决此问题,您可以尝试降级到 python 3.7,其中不包括 asyncio.main,因此您需要手动获取和 运行 事件循环,例如:

loop = asyncio.get_event_loop()
loop.run_until_complete(<your tasks>)
loop.close()

否则,您能否运行 将代码放入docker 容器中?这可能对您有用,然后会脱离 OS 行为,但需要做更多的工作!