Python 异步:运行 subprocess_exec 在工作线程上

Python asyncio: Running subprocess_exec on a worker thread

所以我使用 Python asyncio 模块(在 Linux 上)启动子进程,然后异步监视它。我的代码工作正常...当 主线程 上的 运行 时。但是当我 运行 它在工作线程上时,它挂起,并且永远不会调用 process_exited 回调。

我怀疑这实际上可能是工作线程上 运行ning subprocess_exec 的某种未记录的缺陷或问题,可能与实现如何处理后台线程中的信号有关。但也可能是我把事情搞砸了。

一个简单的、可重现的例子如下:

class MyProtocol(asyncio.SubprocessProtocol):
    def __init__(self, done_future):
        super().__init__()
        self._done_future = done_future

    def pipe_data_received(self, fd, data):
        print("Received:", len(data))

    def process_exited(self):
        print("PROCESS EXITED!")
        self._done_future.set_result(None)

def run(loop):
    done_future = asyncio.Future(loop = loop)
    transport = None
    try:
        transport, protocol = yield from loop.subprocess_exec(
            lambda : MyProtocol(done_future),
            "ls",
            "-lh",
            stdin = None
        )
        yield from done_future
    finally:
        if transport: transport.close()

    return done_future.result()

def run_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) # bind event loop to current thread

    try:
        return loop.run_until_complete(run(loop))
    finally:
        loop.close()

所以在这里,我设置了一个asyncio事件循环来执行shell命令ls -lh,然后在从子进程接收到数据时触发回调,以及另一个回调因为当子进程退出时。

如果我直接在 Python 程序的主线程中调用 run_loop(),一切都很好。但是如果我说:

t = threading.Thread(target = run_loop)
t.start()
t.join()

然后发生的事情是 pipe_data_received() 回调被成功调用,但是 process_exited() 从未被调用,程序只是挂起。

在谷歌搜索并查看 asyncio 实现 unix_events.py 的源代码后,我发现可能需要手动将我的事件循环附加到全局 "child watcher" 对象,如下:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
asyncio.get_child_watcher().attach_loop(loop)

显然,子观察者是一个(未记录的)对象,负责在幕后调用 waitpid(或类似的东西)。但是当我尝试这个,并在后台线程中 运行 run_event_loop() 时,我得到了错误:

  File "/usr/lib/python3.4/asyncio/unix_events.py", line 77, in add_signal_handler
    raise RuntimeError(str(exc))
RuntimeError: set_wakeup_fd only works in main thread

所以这里看起来实现实际上做了一个检查以确保信号处理程序只能在主线程上使用,这让我相信在当前的实现中,在后台使用 subprocess_exec事实上,如果不更改 Python 源代码本身,线程是根本不可能的。

我说得对吗?可悲的是,asyncio 模块的文档非常少,所以我很难对我在这里的结论充满信心。我可能只是做错了什么。

只要主线程中的 asyncio 循环 运行 及其子观察器已实例化,在工作线程中处理子进程就可以了:

asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
coro = loop.run_in_executor(None, run_loop)
loop.run_until_complete(coro)

and the documentation