Python 中的多处理:并行化 for 循环以填充 Numpy 数组

Multiprocessing in Python: Parallelize a for loop to fill a Numpy array

我一直在阅读 this one 之类的话题,但其中任何一个似乎都适合我的情况。我正在尝试并行化以下玩具示例,以在 Python:

中使用多处理在 for 循环中填充 Numpy 数组
import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        for idx, val in enumerate(range(1, n+1)):
            result = pool.apply_async(func1, [val])
            my_array[idx] = result.get()
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Normal time: {}\n".format(end-start))

    start_parallel = time.time()
    my_array_parallelized = func2(60000, parallel=True)
    end_parallel = time.time()

    print(my_array_parallelized)
    print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))


if __name__ == '__main__':
    main()

基于 Multiprocessing 的代码行似乎有效,并为您提供了正确的结果。但是,它比非并行化版本花费的时间要长得多。这是两个版本的输出

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Normal time: 0.01605963706970215

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Time based on multiprocessing: 2.8775112628936768

我的直觉告诉我,这应该是从 pool.apply_async() 捕获结果的更好方法。我究竟做错了什么?实现这一目标的最有效方法是什么?谢谢

创建流程的成本很高。在我的机器上,每个创建的进程至少需要几百微秒。此外,多处理模块在进程之间复制要计算的数据,然后从进程池中收集结果。 进程间通信也很慢。问题是你的计算是微不足道的,可以很快完成,可能比所有引入的开销快得多。 multiprocessing 模块仅在处理非常小的数据集并执行密集计算(与计算数据量相比)时才有用。

希望,当涉及到使用 Numpynumericals 计算时,有一种简单快速的方法可以并行化您的应用程序:Numba JIT。如果您显式使用并行结构(parallel=Trueprange),Numba 可以并行化代码。它使用 线程 而不是在 共享内存 中工作的繁重进程。如果您的代码不处理 本机类型和 Numpy 数组 而不是纯 Python 动态对象(列表,大整数,类,等等)。这是一个例子:

import numpy as np
import numba as nb
import time

@nb.njit
def func1(x, y=1):
    return x**2 + y

@nb.njit('float64[:](int64)', parallel=True)
def func2(n):
    my_array = np.zeros(n)
    for i in nb.prange(1, n+1):
        my_array[i-1] = func1(i)
    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Numba time: {}\n".format(end-start))

if __name__ == '__main__':
    main()

因为 Numba 在运行时编译代码,所以它能够将循环完全优化为无操作,在这种情况下导致时间接近 0 秒。

这是@thisisalsomypassword 提出的解决方案,它改进了我最初的提议。也就是说,“在循环内的列表中收集 AsyncResult 对象,然后在每个结果对象上启动所有进程后调用 AsyncResult.get()”:

import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    time.sleep(0.1)
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        ####### HERE COMES THE CHANGE #######
        results = [pool.apply_async(func1, [val]) for val in range(1, n+1)]
        for idx, val in enumerate(results):
            my_array[idx] = val.get()
        #######
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(600)
    end = time.time()
    
    print(my_array)
    print("Normal time: {}\n".format(end-start))

    start_parallel = time.time()
    my_array_parallelized = func2(600, parallel=True)
    end_parallel = time.time()

    print(my_array_parallelized)
    print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))


if __name__ == '__main__':
    main()

现在可以了。多处理大大减少了时间:

Normal time: 60.107836008071
Time based on multiprocessing: 10.049324989318848    

time.sleep(0.1) 已添加到 func1 中,以抵消成为超级琐碎任务的影响。