Python 异步:运行 subprocess_exec 在工作线程上
Python asyncio: Running subprocess_exec on a worker thread
所以我使用 Python asyncio
模块(在 Linux 上)启动子进程,然后异步监视它。我的代码工作正常...当 主线程 上的 运行 时。但是当我 运行 它在工作线程上时,它挂起,并且永远不会调用 process_exited
回调。
我怀疑这实际上可能是工作线程上 运行ning subprocess_exec
的某种未记录的缺陷或问题,可能与实现如何处理后台线程中的信号有关。但也可能是我把事情搞砸了。
一个简单的、可重现的例子如下:
class MyProtocol(asyncio.SubprocessProtocol):
def __init__(self, done_future):
super().__init__()
self._done_future = done_future
def pipe_data_received(self, fd, data):
print("Received:", len(data))
def process_exited(self):
print("PROCESS EXITED!")
self._done_future.set_result(None)
def run(loop):
done_future = asyncio.Future(loop = loop)
transport = None
try:
transport, protocol = yield from loop.subprocess_exec(
lambda : MyProtocol(done_future),
"ls",
"-lh",
stdin = None
)
yield from done_future
finally:
if transport: transport.close()
return done_future.result()
def run_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
try:
return loop.run_until_complete(run(loop))
finally:
loop.close()
所以在这里,我设置了一个asyncio
事件循环来执行shell命令ls -lh
,然后在从子进程接收到数据时触发回调,以及另一个回调因为当子进程退出时。
如果我直接在 Python 程序的主线程中调用 run_loop()
,一切都很好。但是如果我说:
t = threading.Thread(target = run_loop)
t.start()
t.join()
然后发生的事情是 pipe_data_received()
回调被成功调用,但是 process_exited()
从未被调用,程序只是挂起。
在谷歌搜索并查看 asyncio
实现 unix_events.py
的源代码后,我发现可能需要手动将我的事件循环附加到全局 "child watcher" 对象,如下:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
asyncio.get_child_watcher().attach_loop(loop)
显然,子观察者是一个(未记录的)对象,负责在幕后调用 waitpid
(或类似的东西)。但是当我尝试这个,并在后台线程中 运行 run_event_loop()
时,我得到了错误:
File "/usr/lib/python3.4/asyncio/unix_events.py", line 77, in add_signal_handler
raise RuntimeError(str(exc))
RuntimeError: set_wakeup_fd only works in main thread
所以这里看起来实现实际上做了一个检查以确保信号处理程序只能在主线程上使用,这让我相信在当前的实现中,在后台使用 subprocess_exec
事实上,如果不更改 Python 源代码本身,线程是根本不可能的。
我说得对吗?可悲的是,asyncio
模块的文档非常少,所以我很难对我在这里的结论充满信心。我可能只是做错了什么。
只要主线程中的 asyncio 循环 运行 及其子观察器已实例化,在工作线程中处理子进程就可以了:
asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
coro = loop.run_in_executor(None, run_loop)
loop.run_until_complete(coro)
见 and the documentation。
所以我使用 Python asyncio
模块(在 Linux 上)启动子进程,然后异步监视它。我的代码工作正常...当 主线程 上的 运行 时。但是当我 运行 它在工作线程上时,它挂起,并且永远不会调用 process_exited
回调。
我怀疑这实际上可能是工作线程上 运行ning subprocess_exec
的某种未记录的缺陷或问题,可能与实现如何处理后台线程中的信号有关。但也可能是我把事情搞砸了。
一个简单的、可重现的例子如下:
class MyProtocol(asyncio.SubprocessProtocol):
def __init__(self, done_future):
super().__init__()
self._done_future = done_future
def pipe_data_received(self, fd, data):
print("Received:", len(data))
def process_exited(self):
print("PROCESS EXITED!")
self._done_future.set_result(None)
def run(loop):
done_future = asyncio.Future(loop = loop)
transport = None
try:
transport, protocol = yield from loop.subprocess_exec(
lambda : MyProtocol(done_future),
"ls",
"-lh",
stdin = None
)
yield from done_future
finally:
if transport: transport.close()
return done_future.result()
def run_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
try:
return loop.run_until_complete(run(loop))
finally:
loop.close()
所以在这里,我设置了一个asyncio
事件循环来执行shell命令ls -lh
,然后在从子进程接收到数据时触发回调,以及另一个回调因为当子进程退出时。
如果我直接在 Python 程序的主线程中调用 run_loop()
,一切都很好。但是如果我说:
t = threading.Thread(target = run_loop)
t.start()
t.join()
然后发生的事情是 pipe_data_received()
回调被成功调用,但是 process_exited()
从未被调用,程序只是挂起。
在谷歌搜索并查看 asyncio
实现 unix_events.py
的源代码后,我发现可能需要手动将我的事件循环附加到全局 "child watcher" 对象,如下:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
asyncio.get_child_watcher().attach_loop(loop)
显然,子观察者是一个(未记录的)对象,负责在幕后调用 waitpid
(或类似的东西)。但是当我尝试这个,并在后台线程中 运行 run_event_loop()
时,我得到了错误:
File "/usr/lib/python3.4/asyncio/unix_events.py", line 77, in add_signal_handler
raise RuntimeError(str(exc))
RuntimeError: set_wakeup_fd only works in main thread
所以这里看起来实现实际上做了一个检查以确保信号处理程序只能在主线程上使用,这让我相信在当前的实现中,在后台使用 subprocess_exec
事实上,如果不更改 Python 源代码本身,线程是根本不可能的。
我说得对吗?可悲的是,asyncio
模块的文档非常少,所以我很难对我在这里的结论充满信心。我可能只是做错了什么。
只要主线程中的 asyncio 循环 运行 及其子观察器已实例化,在工作线程中处理子进程就可以了:
asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
coro = loop.run_in_executor(None, run_loop)
loop.run_until_complete(coro)
见