await run_in_executor() 在同步函数终止时无法解锁

await run_in_executor() fails to unblock upon termination of synchronous function

考虑以下简单、自包含的 Python3 程序:

import asyncio
import multiprocessing
import time
loop = asyncio.get_event_loop()

def sub_process(event, i):
    async def async_function():
        await loop.run_in_executor(None, event.wait)  # <-- problematic line
        print(f"successfully awaited on process {i}!")  # <-- not all processes reach this line!
    loop.run_until_complete(async_function())


if __name__ == '__main__':
    event = multiprocessing.Event()
    processes = [multiprocessing.Process(target=sub_process, args=(event, i)) for i in range(multiprocessing.cpu_count())]
    for process in processes:
        process.start()
    time.sleep(2)
    event.set()
    for process in processes:
        process.join()
    print("success.")

我遇到非确定性故障,其中一些(但不是全部)进程无法继续通过“有问题的”线,即,即使在事件设置之后。未能解锁的进程数也是不确定的(在我的 16vCPU AWS 实例上通常在 12-16 之间)。我 能够在 Linux 机器(不是 MacOS)上重现此内容。

asynciomultiprocessing 更像是一个问题。我可以这样说,因为如果我用一个简单的同步函数替换 event.wait,它包装了 event.wait(),同步函数 继续通过 event.wait() 调用,即:

def sub_process(event, i):
    def sync_function():
        event.wait()
        print("this line is reached...")
    async def async_function():
        await loop.run_in_executor(None, sync_function)
        print("...but this line is not.")
    loop.run_until_complete(async_function())

当恰好有一个进程未能解锁并且我通过调用 ctrl+C 终止时,我得到以下堆栈跟踪:

Process Process-16:
Traceback (most recent call last):
  File "temp.py", line 21, in <module>
    process.join()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
Traceback (most recent call last):
    pid, sts = os.waitpid(self.pid, flag)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
KeyboardInterrupt
  File "temp.py", line 10, in sub_process
    loop.run_until_complete(async_function())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.8/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

如有任何见解,我们将不胜感激!谢谢。

编辑:if __name__ == '__main__': 正下方添加 multiprocessing.set_start_method('spawn') 似乎可以解决问题;谢谢@user4815162342。我还是想多了解一下是怎么回事。

Self-answering:在 if __name__ == '__main__' 正下方添加 multiprocessing.set_start_method('spawn') 可解决问题。感谢@user4815162342 指出这一点。

原因似乎与 low-level 与 multi-threading 的不兼容性和分叉有关;例如,“请注意,安全地分叉多线程进程是有问题的。” (来自 multiprocessing docs). Note that, under the hood, loop.run_in_executor uses a ThreadPoolExecutor;这就是程序首先是 multi-threaded 的原因(可能不会立即显现出来)。

https://bugs.python.org/issue33725 似乎提供了更多详细信息。虽然我基本上忘记了细节,但 MacOS 似乎没有提供良好的 low-level 对分叉 multi-threaded 进程的支持。

TL;DRmultiprocessing.set_start_method('spawn') 放在脚本的顶部以修复 Linux 上的问题。该问题无法在 MacOS 上重现,因为 spawn 是那里的默认启动方法。

Unix-like 操作系统本身提供 fork primitive for parallelism. A call to fork() efficiently duplicates the current process and continues executing both. fork() is still widely used to execute other programs, where a call to fork() is immediately followed by exec()。如今,forking 很少用于并行性,主要被 multi-threading 取代,其中内存由所有线程共享并且不需要昂贵的 inter-process 通信。然而,fork() 似乎是 multiprocessing 的完美工具,它需要多个单独的进程 运行ning 同一个可执行文件(Python 解释器),这正是 fork 提供的。事实上,分叉的子进程在 parent 停止的地方继续进行也是一个好处,因为 parent 中加载的所有 Python 模块都会自动出现在 child 中。

在 Unix-like 系统上 multiprocessing 通过分叉当前进程并继续执行 运行 侦听输入队列的代码来创建工作进程,准备执行 user-supplied 作业.在不提供 fork() 的 Windows 上,它使用了不同的方法:它通过执行一个全新的 python.exe 创建一个 worker,使用 command-line 参数和环境变量来指示它 bootstrap 进入多处理工作程序。 spawn-based 方法的缺点是初始化工作线程池的速度要慢得多,因为它需要为池中的每个工作线程创建一个全新的 Python 实例,这对于导入的进程来说可能会花费很多时间大型图书馆。它还占用更多内存,因为与部署 copy-on-write 以保存重复内存页的 fork 不同,运行 新的可执行文件在 parent 和 [=63= 之间几乎没有任何共享].另一方面,产卵的好处是每个工人都从一个完全干净的石板开始,事实证明这是至关重要的。

碰巧,分叉与线程的交互非常糟糕,因为 fork() 仅复制调用 fork() 的线程。任何创建线程的代码都不会在 child 中找到它,但它的数据结构会指示线程已成功创建。即使您编写的代码不是多线程的,这也会对您产生影响——使用创建辅助线程的库作为实现细节就足够了。分叉还与互斥量交互:当不相关的线程持有互斥量时,可能会发生对 fork() 的调用。当 child 进程调入需要互斥锁的代码并试图获取它时,它将死锁。 POSIX 试图提供方法 mitigate these issues, but these workarounds are extremely difficult and often downright impossible to use correctly and consistently. The Apple-provided MacOS system libraries don't even try, so Python devs gave up and just made spawning the default multiprocessing worker start method on MacOS. And as of Python 3.4, multiprocessing.set_start_method 可用于在任何 OS.

上请求 spawn-based worker 创建方法

除了与线程交互之外,asyncio 还对 fork 提出了自己的一系列挑战。异步事件循环使用线程池来阻塞内部使用的操作,这可能在 child 中被破坏。它使用管道来实现 call_soon_threadsafe(并通过扩展 run_in_executor,它使用相同的机制在作业完成时提醒事件循环),以及 asyncio-safe 信号处理程序。这些管道由 child 进程继承,因此它对管道的写入最终可能会被 parent 甚至可能是其兄弟进程接收。尝试在分叉的 child 中使用 相同的 事件循环几乎肯定是错误的。可能有用的是创建一个新的事件循环,但很可能这种用法从未被开发人员系统地测试过,实际上不受支持。

如果要在worker中使用asyncio,建议切换到spawnforkserver方法来创建multiprocessing worker