await run_in_executor() 在同步函数终止时无法解锁
await run_in_executor() fails to unblock upon termination of synchronous function
考虑以下简单、自包含的 Python3 程序:
import asyncio
import multiprocessing
import time
loop = asyncio.get_event_loop()
def sub_process(event, i):
async def async_function():
await loop.run_in_executor(None, event.wait) # <-- problematic line
print(f"successfully awaited on process {i}!") # <-- not all processes reach this line!
loop.run_until_complete(async_function())
if __name__ == '__main__':
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=sub_process, args=(event, i)) for i in range(multiprocessing.cpu_count())]
for process in processes:
process.start()
time.sleep(2)
event.set()
for process in processes:
process.join()
print("success.")
我遇到非确定性故障,其中一些(但不是全部)进程无法继续通过“有问题的”线,即,即使在事件设置之后。未能解锁的进程数也是不确定的(在我的 16vCPU AWS 实例上通常在 12-16 之间)。我 仅 能够在 Linux 机器(不是 MacOS)上重现此内容。
asyncio
比 multiprocessing
更像是一个问题。我可以这样说,因为如果我用一个简单的同步函数替换 event.wait
,它包装了 event.wait()
,同步函数 会 继续通过 event.wait()
调用,即:
def sub_process(event, i):
def sync_function():
event.wait()
print("this line is reached...")
async def async_function():
await loop.run_in_executor(None, sync_function)
print("...but this line is not.")
loop.run_until_complete(async_function())
当恰好有一个进程未能解锁并且我通过调用 ctrl+C 终止时,我得到以下堆栈跟踪:
Process Process-16:
Traceback (most recent call last):
File "temp.py", line 21, in <module>
process.join()
File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
Traceback (most recent call last):
pid, sts = os.waitpid(self.pid, flag)
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
KeyboardInterrupt
File "temp.py", line 10, in sub_process
loop.run_until_complete(async_function())
File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
self.run_forever()
File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
self._run_once()
File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.8/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
如有任何见解,我们将不胜感激!谢谢。
编辑: 在 if __name__ == '__main__':
正下方添加 multiprocessing.set_start_method('spawn')
似乎可以解决问题;谢谢@user4815162342。我还是想多了解一下是怎么回事。
Self-answering:在 if __name__ == '__main__'
正下方添加 multiprocessing.set_start_method('spawn')
可解决问题。感谢@user4815162342 指出这一点。
原因似乎与 low-level 与 multi-threading 的不兼容性和分叉有关;例如,“请注意,安全地分叉多线程进程是有问题的。” (来自 multiprocessing docs). Note that, under the hood, loop.run_in_executor
uses a ThreadPoolExecutor
;这就是程序首先是 multi-threaded 的原因(可能不会立即显现出来)。
https://bugs.python.org/issue33725 似乎提供了更多详细信息。虽然我基本上忘记了细节,但 MacOS 似乎没有提供良好的 low-level 对分叉 multi-threaded 进程的支持。
TL;DR 将 multiprocessing.set_start_method('spawn')
放在脚本的顶部以修复 Linux 上的问题。该问题无法在 MacOS 上重现,因为 spawn
是那里的默认启动方法。
Unix-like 操作系统本身提供 fork primitive for parallelism. A call to fork()
efficiently duplicates the current process and continues executing both. fork()
is still widely used to execute other programs, where a call to fork()
is immediately followed by exec()
。如今,forking 很少用于并行性,主要被 multi-threading 取代,其中内存由所有线程共享并且不需要昂贵的 inter-process 通信。然而,fork()
似乎是 multiprocessing
的完美工具,它需要多个单独的进程 运行ning 同一个可执行文件(Python 解释器),这正是 fork 提供的。事实上,分叉的子进程在 parent 停止的地方继续进行也是一个好处,因为 parent 中加载的所有 Python 模块都会自动出现在 child 中。
在 Unix-like 系统上 multiprocessing
通过分叉当前进程并继续执行 运行 侦听输入队列的代码来创建工作进程,准备执行 user-supplied 作业.在不提供 fork()
的 Windows 上,它使用了不同的方法:它通过执行一个全新的 python.exe
创建一个 worker,使用 command-line 参数和环境变量来指示它 bootstrap 进入多处理工作程序。 spawn-based 方法的缺点是初始化工作线程池的速度要慢得多,因为它需要为池中的每个工作线程创建一个全新的 Python 实例,这对于导入的进程来说可能会花费很多时间大型图书馆。它还占用更多内存,因为与部署 copy-on-write 以保存重复内存页的 fork 不同,运行 新的可执行文件在 parent 和 [=63= 之间几乎没有任何共享].另一方面,产卵的好处是每个工人都从一个完全干净的石板开始,事实证明这是至关重要的。
碰巧,分叉与线程的交互非常糟糕,因为 fork()
仅复制调用 fork()
的线程。任何创建线程的代码都不会在 child 中找到它,但它的数据结构会指示线程已成功创建。即使您编写的代码不是多线程的,这也会对您产生影响——使用创建辅助线程的库作为实现细节就足够了。分叉还与互斥量交互:当不相关的线程持有互斥量时,可能会发生对 fork()
的调用。当 child 进程调入需要互斥锁的代码并试图获取它时,它将死锁。 POSIX 试图提供方法 mitigate these issues, but these workarounds are extremely difficult and often downright impossible to use correctly and consistently. The Apple-provided MacOS system libraries don't even try, so Python devs gave up and just made spawning the default multiprocessing worker start method on MacOS. And as of Python 3.4, multiprocessing.set_start_method
可用于在任何 OS.
上请求 spawn-based worker 创建方法
除了与线程交互之外,asyncio 还对 fork 提出了自己的一系列挑战。异步事件循环使用线程池来阻塞内部使用的操作,这可能在 child 中被破坏。它使用管道来实现 call_soon_threadsafe
(并通过扩展 run_in_executor
,它使用相同的机制在作业完成时提醒事件循环),以及 asyncio-safe 信号处理程序。这些管道由 child 进程继承,因此它对管道的写入最终可能会被 parent 甚至可能是其兄弟进程接收。尝试在分叉的 child 中使用 相同的 事件循环几乎肯定是错误的。可能有用的是创建一个新的事件循环,但很可能这种用法从未被开发人员系统地测试过,实际上不受支持。
如果要在worker中使用asyncio,建议切换到spawn
或forkserver
方法来创建multiprocessing worker
考虑以下简单、自包含的 Python3 程序:
import asyncio
import multiprocessing
import time
loop = asyncio.get_event_loop()
def sub_process(event, i):
async def async_function():
await loop.run_in_executor(None, event.wait) # <-- problematic line
print(f"successfully awaited on process {i}!") # <-- not all processes reach this line!
loop.run_until_complete(async_function())
if __name__ == '__main__':
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=sub_process, args=(event, i)) for i in range(multiprocessing.cpu_count())]
for process in processes:
process.start()
time.sleep(2)
event.set()
for process in processes:
process.join()
print("success.")
我遇到非确定性故障,其中一些(但不是全部)进程无法继续通过“有问题的”线,即,即使在事件设置之后。未能解锁的进程数也是不确定的(在我的 16vCPU AWS 实例上通常在 12-16 之间)。我 仅 能够在 Linux 机器(不是 MacOS)上重现此内容。
asyncio
比 multiprocessing
更像是一个问题。我可以这样说,因为如果我用一个简单的同步函数替换 event.wait
,它包装了 event.wait()
,同步函数 会 继续通过 event.wait()
调用,即:
def sub_process(event, i):
def sync_function():
event.wait()
print("this line is reached...")
async def async_function():
await loop.run_in_executor(None, sync_function)
print("...but this line is not.")
loop.run_until_complete(async_function())
当恰好有一个进程未能解锁并且我通过调用 ctrl+C 终止时,我得到以下堆栈跟踪:
Process Process-16:
Traceback (most recent call last):
File "temp.py", line 21, in <module>
process.join()
File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
Traceback (most recent call last):
pid, sts = os.waitpid(self.pid, flag)
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
KeyboardInterrupt
File "temp.py", line 10, in sub_process
loop.run_until_complete(async_function())
File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
self.run_forever()
File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
self._run_once()
File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.8/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
如有任何见解,我们将不胜感激!谢谢。
编辑: 在 if __name__ == '__main__':
正下方添加 multiprocessing.set_start_method('spawn')
似乎可以解决问题;谢谢@user4815162342。我还是想多了解一下是怎么回事。
Self-answering:在 if __name__ == '__main__'
正下方添加 multiprocessing.set_start_method('spawn')
可解决问题。感谢@user4815162342 指出这一点。
原因似乎与 low-level 与 multi-threading 的不兼容性和分叉有关;例如,“请注意,安全地分叉多线程进程是有问题的。” (来自 multiprocessing docs). Note that, under the hood, loop.run_in_executor
uses a ThreadPoolExecutor
;这就是程序首先是 multi-threaded 的原因(可能不会立即显现出来)。
https://bugs.python.org/issue33725 似乎提供了更多详细信息。虽然我基本上忘记了细节,但 MacOS 似乎没有提供良好的 low-level 对分叉 multi-threaded 进程的支持。
TL;DR 将 multiprocessing.set_start_method('spawn')
放在脚本的顶部以修复 Linux 上的问题。该问题无法在 MacOS 上重现,因为 spawn
是那里的默认启动方法。
Unix-like 操作系统本身提供 fork primitive for parallelism. A call to fork()
efficiently duplicates the current process and continues executing both. fork()
is still widely used to execute other programs, where a call to fork()
is immediately followed by exec()
。如今,forking 很少用于并行性,主要被 multi-threading 取代,其中内存由所有线程共享并且不需要昂贵的 inter-process 通信。然而,fork()
似乎是 multiprocessing
的完美工具,它需要多个单独的进程 运行ning 同一个可执行文件(Python 解释器),这正是 fork 提供的。事实上,分叉的子进程在 parent 停止的地方继续进行也是一个好处,因为 parent 中加载的所有 Python 模块都会自动出现在 child 中。
在 Unix-like 系统上 multiprocessing
通过分叉当前进程并继续执行 运行 侦听输入队列的代码来创建工作进程,准备执行 user-supplied 作业.在不提供 fork()
的 Windows 上,它使用了不同的方法:它通过执行一个全新的 python.exe
创建一个 worker,使用 command-line 参数和环境变量来指示它 bootstrap 进入多处理工作程序。 spawn-based 方法的缺点是初始化工作线程池的速度要慢得多,因为它需要为池中的每个工作线程创建一个全新的 Python 实例,这对于导入的进程来说可能会花费很多时间大型图书馆。它还占用更多内存,因为与部署 copy-on-write 以保存重复内存页的 fork 不同,运行 新的可执行文件在 parent 和 [=63= 之间几乎没有任何共享].另一方面,产卵的好处是每个工人都从一个完全干净的石板开始,事实证明这是至关重要的。
碰巧,分叉与线程的交互非常糟糕,因为 fork()
仅复制调用 fork()
的线程。任何创建线程的代码都不会在 child 中找到它,但它的数据结构会指示线程已成功创建。即使您编写的代码不是多线程的,这也会对您产生影响——使用创建辅助线程的库作为实现细节就足够了。分叉还与互斥量交互:当不相关的线程持有互斥量时,可能会发生对 fork()
的调用。当 child 进程调入需要互斥锁的代码并试图获取它时,它将死锁。 POSIX 试图提供方法 mitigate these issues, but these workarounds are extremely difficult and often downright impossible to use correctly and consistently. The Apple-provided MacOS system libraries don't even try, so Python devs gave up and just made spawning the default multiprocessing worker start method on MacOS. And as of Python 3.4, multiprocessing.set_start_method
可用于在任何 OS.
除了与线程交互之外,asyncio 还对 fork 提出了自己的一系列挑战。异步事件循环使用线程池来阻塞内部使用的操作,这可能在 child 中被破坏。它使用管道来实现 call_soon_threadsafe
(并通过扩展 run_in_executor
,它使用相同的机制在作业完成时提醒事件循环),以及 asyncio-safe 信号处理程序。这些管道由 child 进程继承,因此它对管道的写入最终可能会被 parent 甚至可能是其兄弟进程接收。尝试在分叉的 child 中使用 相同的 事件循环几乎肯定是错误的。可能有用的是创建一个新的事件循环,但很可能这种用法从未被开发人员系统地测试过,实际上不受支持。
如果要在worker中使用asyncio,建议切换到spawn
或forkserver
方法来创建multiprocessing worker