使用 cython 使用 dask 或 joblib 多处理编译可执行文件会导致错误

Compiling Executable with dask or joblib multiprocessing with cython results in errors

我正在将一些串行处理的 python 作业转换为使用 dask 或 joblib 的多处理。遗憾的是我需要继续 windows.
当 运行 从 IPython 中或从命令行使用 python 调用 py 文件时,一切都 运行 正常。
使用 cython 编译可执行文件时,它不再 运行 正常:越来越多的进程(无限且大于请求的进程数)开始启动并阻塞我的系统。
它在某种程度上感觉像 Multiprocessing Bomb - 但当然我使用 if __name__=="__main__:" 来获得控制块 - 由 python 在命令行调用的罚款 运行 批准。
我的 cython 调用是 cython --embed --verbose --annotate THECODE.PY,我正在使用 gcc -time -municode -DMS_WIN64 -mthreads -Wall -O -I"PATH_TO_\include" -L"PATH_TO_\libs" THECODE.c -lpython36 -o THECODE 进行编译,生成 windows 可执行文件 THECODE.exe.
使用其他(单一处理)代码 运行 没问题。
dask 和 joblib 的问题似乎是一样的(这可能意味着 dask 的工作方式类似于或基于 joblib)。
有什么建议吗?

对于那些对 mcve 感兴趣的人:只需从 Multiprocessing Bomb 中获取第一个代码并使用我上面的 cython 命令对其进行编译将导致可执行文件破坏您的系统。 (我刚试过:-))

我刚刚通过在代码示例中添加一行来显示 __name__:

发现了一些有趣的东西
import multiprocessing

def worker():
    """worker function"""
    print('Worker')
    return

print("-->" + __name__ + "<--")
if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

当运行那段带有python的代码显示

__main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__

(其他输出被抑制)。解释 if 决定是否有效。 运行 cython 编译后的可执行文件显示

__main__
__main__
__main__
__main__
__main__
__main__

越来越多。因此,worker 对模块的调用不再像导入那样 masqueraded,因此每个 worker 都会尝试以递归方式启动五个新模块。

当启动一个新的 python-process multiprocessing-module 在 Windows 上使用 spawn-method(此行为也可以在 Linux 上触发通过使用 mp.set_start_method('spawn').

在新进程中将命令行参数传递给解释器,因此可以建立与父进程的通信,例如:

 python -c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork

嵌入式 cython 模块(或一般情况下冻结(即使用 cx_Freeze、py2exe 和类似模块创建)模块的问题),将命令行参数传递给它们更符合

python my_script.py <arguments>

即interpeter 不会自动处理命令行,但需要在脚本中处理。

multiprocessing 提供了一个名为 multiprocessing.freeze_support(), which handles the command line arguments correctly and which can be used as shown in :

的函数
if __name__ == '__main__':
    # needed for Cython, as it doesn't set `frozen`-attribute
    setattr(sys, 'frozen', True) 
    # parse command line options and execute it if needed
    multiprocessing.freeze_support()

此解决方案仅适用于 Windows,如代码所示:

def freeze_support(self):
    '''Check whether this is a fake forked process in a frozen executable.
    If so then run code specified by commandline and exit.
    '''
    if sys.platform == 'win32' and getattr(sys, 'frozen', False):
        from .spawn import freeze_support
        freeze_support()

有一个错误报告:multiprocessing freeze_support needed outside win32 might/might 不会很快修复。

如上面的错误报告中所述,将 frozen 属性设置为 True 并直接从 multiprocessing.spawn 调用 freeze_support 是不够的,因为比信号量跟踪器未正确处理。

我看到了两个选项:使用上述错误报告中尚未发布的补丁来修补您的安装,或者使用下面介绍的自己动手的方法。


这是此答案的早期版本,它更 "experimental" 但提供了更多 insights/details 并提出了一种有点自己动手做的解决方案。

我在linux,所以我用mp.set_start_method('spawn')来模拟windows的行为。

spawn 模式下会发生什么?让我们添加一些 sleeps,以便我们可以调查进程:

#bomb.py
import multiprocessing as mp
import sys
import time

def worker():
    time.sleep(50)
    print('Worker')
    return

if __name__ == '__main__':
        print("Starting...")
        time.sleep(20)
        mp.set_start_method('spawn') ## use spawn!
        jobs = []
        for i in range(5):
            p = mp.Process(target=worker)
            jobs.append(p)
            p.start()

通过使用pgrep python我们可以看到起初只有一个-python进程,然后是7(!)个不同的pids。我们可以通过 cat /proc/<pid>/cmdline 查看命令行参数。 5 个新进程有命令行

-c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork

还有一个:

-c "from multiprocessing.semaphore_tracker import main;main(4)"

也就是说,父进程启动了 6 个新的 python 解释器实例,每个新启动的解释器都执行父进程通过命令行选项发送的代码,信息通过管道共享。这 6 个 python-instances 之一是跟踪器,它观察整个事情。

好的,如果cythonized+embeded会怎​​么样?与正常的 python 相同,唯一的区别是启动 bomb-可执行文件而不是 python。但与 python-解释器不同,它 execute/isn 不知道命令行参数,因此 main 函数一遍又一遍地运行。

有一个简单的修复方法:让 bomb-exe 启动 python 解释器

 ...
 if __name__ == '__main__':
    mp.set_executable(<PATH TO PYTHON>)
 ....

现在 bomb 不再是多处理炸弹!

然而,我们的目标可能不是要有一个 python-解释器,所以我们需要让我们的程序知道可能的命令行:

import re
......
if __name__ == '__main__':
    if len(sys.argv)==3:  # should start in semaphore_tracker mode
        nr=list(map(int, re.findall(r'\d+',sys.argv[2])))          
        sys.argv[1]='--multiprocessing-fork'   # this canary is needed for multiprocessing module to work   
        from multiprocessing.semaphore_tracker import main;main(nr[0])

    elif len(sys.argv)>3: # should start in slave mode
        fd, pipe=map(int, re.findall(r'\d+',sys.argv[2]))
        print("I'm a slave!, fd=%d, pipe=%d"%(fd,pipe)) 
        sys.argv[1]='--multiprocessing-fork'   # this canary is needed for multiprocessing module to work  
        from multiprocessing.spawn import spawn_main; 
        spawn_main(tracker_fd=fd, pipe_handle=pipe)

    else: #main mode
        print("Starting...")
        mp.set_start_method('spawn')
        jobs = []
        for i in range(5):
            p = mp.Process(target=worker)
            jobs.append(p)
            p.start()

现在,我们的炸弹不需要一个独立的 python-interpreter 并在工作人员完成后停止。请注意以下事项:

  1. 决定以何种模式启动 bomb 的方式不是很安全,但我希望你能理解要点
  2. --multiprocessing-fork 只是一个金丝雀,它什么都不做,只是必须存在,请参阅 here

注意:更改后的代码也可以与 python 一起使用,因为在执行 "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork 之后 python 更改了 sys.argv 因此代码不再看到原始命令线和 len(sys.argv)1.

受到 ead 的答案(或那里给定的想法)的启发,我找到了一个非常简单的解决方案 - 或者我们最好称之为解决方法。
对我来说只是将 if 子句更改为

if __name__ == '__main__':
    if len(sys.argv) == 1:
        main()
    else:
        sys.argv[1] = sys.argv[3]
        exec(sys.argv[2])

做到了。
这样做的原因是(在我的例子中): 调用原始 .py 文件时,worker 的 __name__ 设置为 __mp_main__(但所有进程都只是普通的 .py 文件)。
当 运行 (cython) 编译版本时,worker 的 name 不可用,但是 worker 被称为不同的,因此我们可以通过 argv 中的多个参数来识别它们。在我的例子中,工人的 argv 读取

['MYPROGRAMM.exe',
 '-c',
 'from multiprocessing.spawn import spawn_main;
       spawn_main(parent_pid=9316, pipe_handle =392)',
 '--multiprocessing-fork']

因此在argv[2]中找到了激活工人的代码,并通过上层命令执行。
当然,如果您需要编译文件的参数,则需要付出更大的努力,可能需要解析调用中的 parent_pid。但就我而言,那简直是过头了。

我认为基于 submitted bug report 的详细信息,我可以在这里提供可能是最优雅的解决方案

if __name__ == '__main__':
    if sys.argv[0][-4:] == '.exe':
        setattr(sys, 'frozen', True)
    multiprocessing.freeze_support()
    YOURMAINROUTINE()

windows 需要 freeze_support()-调用 - 请参阅 python multiprocessing documentation
如果 运行 在 python 内,只有那一行就已经很好了。
但是不知何故,cython 显然不知道其中的一些事情(文档告诉它用 py2exePyInstallercx_Freeze 进行了测试)。它可以通过 setattr-调用来缓解,它只能在编译时使用,因此由文件扩展名决定。

由于建议的解决方案对我不起作用,我将提供一个额外的答案和解决方法。

我冻结的应用程序也导致了多处理炸弹。我可以通过

解决
  1. 使用 Thread-based parallelism,而不是基于进程的多处理和
  2. 在 Joblib 并行执行中,使用 Parallel(n_jobs=4, prefer="threads"),如 答案所建议的(而不是默认的 prefer="multiprocessing")

我无法让 multiprocessing.Pool 在冻结的应用程序中工作(无论是 prefer="threads" 还是 prefer="multiprocessing"),但可以通过 [=25 切换到基于线程的多处理=]:

# a dependency with joblib
from dep_with_joblib import BigJob
# multiprocessing wrapper for threaded.Thread
from multiprocessing.dummy import Pool as ThreadPool
# instead of
# from multiprocessing import Pool

# thread based parallelism,
# works if `Parallel(n_jobs=4, prefer="threads")` is used
# in joblib (e.g. inside big_job())
POOL = ThreadPool(processes=1)

# as far as I can tell,
# the following Process based Parallelism 
# does _not_ work with frozen app/joblib atm
# POOL = Pool(processes=1)

class MainClass():
    def __init__(self):
        """Init ClusterGen"""
        return

    @staticmethod
    def run_big_job(big_job, data):
        """Run big_job on parallel thread"""
        big_job()
        return big_job

   def big_job_exec(self):
        """Big job execution"""


        bigjob = BigJob()
        big_job_input_data = ...
        # Start big_job on different thread
        async_result = POOL.apply_async(
            MainClass.run_big_job, (bigjob, big_job_input_data))
        # get results from clusterer
        bigjob_results = async_result.get()

更明确的示例 Queuethreading.Thread:

import threading
import queue
# a dependency with joblib
from dep_with_joblib import BigJob

job_queue = queue.Queue()

def store_in_queue(f):
    def wrapper(*args):
        job_queue.put(f(*args))
    return wrapper

class MainClass():
    def __init__(self):
        """Init ClusterGen"""
        return

    @staticmethod
    @store_in_queue
    def run_big_job(big_job, data):
        """Run big_job on parallel thread"""
        big_job()
        return big_job

   def big_job_exec(self):
        """Big job execution"""

        bigjob = BigJob()
        big_job_input_data = ...
        # Start big_job on different thread
        t = threading.Thread(
            target=MainClass.run_big_job,
            args=(bigjob, big_job_input_data),
            group=None,
            name="example-bigjob",
        )
        t.start()
        # get results from big_job
        bigjob_results = job_queue.get()

在上面的两个例子中,bigjob() 在不同的线程上是 运行 异步的。示例可以很容易地用多线程修改。

为什么要异步?在我的例子中,BigJob() 是一个依赖项的模块,它使用 Joblib.Parallel 来提高速度,当我的应用程序被冻结时它不起作用 + 我需要 bigjob() 到 运行 异步以防止我的 GUI 崩溃。