使用 python 多处理优化简单的 CPU 绑定函数
Optimizing a simple CPU bound function with python multiprocessing
我试图了解 multiprocessing.Pool 的工作原理,并且我开发了一个最小示例来说明我的问题。简而言之,我按照示例 Dead simple example of using Multiprocessing Queue, Pool and Locking 使用 pool.map 并行化对数组操作的 CPU 绑定函数。当我遵循该模式时,我只获得了 4 个核心的适度加速,但是如果我改为手动将数组分块为 num_threads,然后使用 pool.map块,我发现加速因子大大超过 4 倍,这对我来说毫无意义。详细信息如下。
首先,函数定义。
def take_up_time():
n = 1e3
while n > 0:
n -= 1
def count_even_numbers(x):
take_up_time()
return np.where(np.mod(x, 2) == 0, 1, 0)
现在定义我们要进行基准测试的函数。
首先是运行连续的函数:
def serial(arr):
return np.sum(map(count_even_numbers,arr))
现在以"standard"方式使用Pool.map的函数:
def parallelization_strategy1(arr):
num_threads = multiprocessing_count()
pool = multiprocessing.Pool(num_threads)
result = pool.map(count_even_numbers,arr)
pool.close()
return np.sum(result)
最后,第二种策略,我手动对数组进行分块,然后 运行 Pool.map 分块(由于 python numpy split array into unequal subarrays 而拆分解决方案)
def split_padded(a,n):
""" Simple helper function for strategy 2
"""
padding = (-len(a))%n
if padding == 0:
return np.split(a, n)
else:
sub_arrays = np.split(np.concatenate((a,np.zeros(padding))),n)
sub_arrays[-1] = sub_arrays[-1][:-padding]
return sub_arrays
def parallelization_strategy2(arr):
num_threads = multiprocessing_count()
sub_arrays = split_padded(arr, num_threads)
pool = multiprocessing.Pool(num_threads)
result = pool.map(count_even_numbers,sub_arrays)
pool.close()
return np.sum(np.array(result))
这是我的数组输入:
npts = 1e3
arr = np.arange(npts)
现在我使用 IPython %timeit 函数来 运行 我的计时,对于 1e3 分,我得到以下结果:
- 串行:10个循环,3个循环中的最佳:每个循环98.7毫秒
- parallelization_strategy1:10 个循环,最好的 3 个:每个循环 77.7 毫秒
- parallelization_strategy2:10 个循环,最好的 3 个:每个循环 22 毫秒
因为我有 4 个内核,策略 1 的加速比令人失望,而策略 2 比最大 4 倍的加速比大得令人怀疑。
当我将 npts 增加到 1e4 时,结果更加令人费解:
- 串行:1个循环,最好的3个:每个循环967毫秒
- parallelization_strategy1:1 个循环,3 个循环中的最佳:每个循环 596 毫秒
- parallelization_strategy2:10 个循环,最好的 3 个:每个循环 22.9 毫秒
所以混淆的两个来源是:
- 策略 2 比朴素的理论极限要快得多
- 出于某种原因,npts=1e4 的 %timeit 仅触发序列和策略 1 的 1 个循环,但策略 2 的 10 个循环。
你的策略不一样!
在第一个策略中,Pool.map
遍历数组,因此为每个数组项调用 count_even_numbers
(因为数组的形状是一维的)。
第二个策略映射数组列表,因此对列表中的每个数组调用 count_even_numbers
。
原来你的例子非常适合 Pythran 模型。编译如下源码count_even.py
:
#pythran export count_even(int [:])
import numpy as np
def count_even_numbers(x):
return np.where(np.mod(x, 2) == 0, 1, 0)
def count_even(arr):
s = 0
#omp parallel for reduction(+:s)
for elem in arr:
s += count_even_numbers(elem)
return s
使用命令行(-fopenmp 激活 OpenMP 注释的处理):
pythran count_even.py -fopenmp
和 运行 timeit
由于转换为本机代码,这已经产生了巨大的加速:
没有 Pythran
$ python -m timeit -s 'import numpy as np; arr = np.arange(1e7, dtype=int); from count_even import count_even' 'count_even(arr)'
verryyy long, more than several minutes :-/
用Pythran,一核
$ OMP_NUM_THREADS=1 python -m timeit -s 'import numpy as np; arr = np.arange(1e7, dtype=int); from count_even import count_even' 'count_even(arr)'
100 loops, best of 3: 10.3 msec per loop
使用Pythran,两个内核:
$ OMP_NUM_THREADS=2 python -m timeit -s 'import numpy as np; arr = np.arange(1e7, dtype=int); from count_even import count_even' 'count_even(arr)'
100 loops, best of 3: 5.5 msec per loop
两倍的速度,并行化正在工作:-)
请注意,OpenMP 支持多线程,而不是多处理。
我试图了解 multiprocessing.Pool 的工作原理,并且我开发了一个最小示例来说明我的问题。简而言之,我按照示例 Dead simple example of using Multiprocessing Queue, Pool and Locking 使用 pool.map 并行化对数组操作的 CPU 绑定函数。当我遵循该模式时,我只获得了 4 个核心的适度加速,但是如果我改为手动将数组分块为 num_threads,然后使用 pool.map块,我发现加速因子大大超过 4 倍,这对我来说毫无意义。详细信息如下。
首先,函数定义。
def take_up_time():
n = 1e3
while n > 0:
n -= 1
def count_even_numbers(x):
take_up_time()
return np.where(np.mod(x, 2) == 0, 1, 0)
现在定义我们要进行基准测试的函数。
首先是运行连续的函数:
def serial(arr):
return np.sum(map(count_even_numbers,arr))
现在以"standard"方式使用Pool.map的函数:
def parallelization_strategy1(arr):
num_threads = multiprocessing_count()
pool = multiprocessing.Pool(num_threads)
result = pool.map(count_even_numbers,arr)
pool.close()
return np.sum(result)
最后,第二种策略,我手动对数组进行分块,然后 运行 Pool.map 分块(由于 python numpy split array into unequal subarrays 而拆分解决方案)
def split_padded(a,n):
""" Simple helper function for strategy 2
"""
padding = (-len(a))%n
if padding == 0:
return np.split(a, n)
else:
sub_arrays = np.split(np.concatenate((a,np.zeros(padding))),n)
sub_arrays[-1] = sub_arrays[-1][:-padding]
return sub_arrays
def parallelization_strategy2(arr):
num_threads = multiprocessing_count()
sub_arrays = split_padded(arr, num_threads)
pool = multiprocessing.Pool(num_threads)
result = pool.map(count_even_numbers,sub_arrays)
pool.close()
return np.sum(np.array(result))
这是我的数组输入:
npts = 1e3
arr = np.arange(npts)
现在我使用 IPython %timeit 函数来 运行 我的计时,对于 1e3 分,我得到以下结果:
- 串行:10个循环,3个循环中的最佳:每个循环98.7毫秒
- parallelization_strategy1:10 个循环,最好的 3 个:每个循环 77.7 毫秒
- parallelization_strategy2:10 个循环,最好的 3 个:每个循环 22 毫秒
因为我有 4 个内核,策略 1 的加速比令人失望,而策略 2 比最大 4 倍的加速比大得令人怀疑。
当我将 npts 增加到 1e4 时,结果更加令人费解:
- 串行:1个循环,最好的3个:每个循环967毫秒
- parallelization_strategy1:1 个循环,3 个循环中的最佳:每个循环 596 毫秒
- parallelization_strategy2:10 个循环,最好的 3 个:每个循环 22.9 毫秒
所以混淆的两个来源是:
- 策略 2 比朴素的理论极限要快得多
- 出于某种原因,npts=1e4 的 %timeit 仅触发序列和策略 1 的 1 个循环,但策略 2 的 10 个循环。
你的策略不一样!
在第一个策略中,Pool.map
遍历数组,因此为每个数组项调用 count_even_numbers
(因为数组的形状是一维的)。
第二个策略映射数组列表,因此对列表中的每个数组调用 count_even_numbers
。
原来你的例子非常适合 Pythran 模型。编译如下源码count_even.py
:
#pythran export count_even(int [:])
import numpy as np
def count_even_numbers(x):
return np.where(np.mod(x, 2) == 0, 1, 0)
def count_even(arr):
s = 0
#omp parallel for reduction(+:s)
for elem in arr:
s += count_even_numbers(elem)
return s
使用命令行(-fopenmp 激活 OpenMP 注释的处理):
pythran count_even.py -fopenmp
和 运行 timeit
由于转换为本机代码,这已经产生了巨大的加速:
没有 Pythran
$ python -m timeit -s 'import numpy as np; arr = np.arange(1e7, dtype=int); from count_even import count_even' 'count_even(arr)'
verryyy long, more than several minutes :-/
用Pythran,一核
$ OMP_NUM_THREADS=1 python -m timeit -s 'import numpy as np; arr = np.arange(1e7, dtype=int); from count_even import count_even' 'count_even(arr)'
100 loops, best of 3: 10.3 msec per loop
使用Pythran,两个内核:
$ OMP_NUM_THREADS=2 python -m timeit -s 'import numpy as np; arr = np.arange(1e7, dtype=int); from count_even import count_even' 'count_even(arr)'
100 loops, best of 3: 5.5 msec per loop
两倍的速度,并行化正在工作:-)
请注意,OpenMP 支持多线程,而不是多处理。