如何使用多处理将函数应用于二维 numpy 数组
How to apply a function to a 2D numpy array with multiprocessing
假设我有以下功能:
def f(x,y):
return x*y
如何使用多处理模块将函数应用于 NxM 二维 numpy 数组中的每个元素?使用串行迭代,代码可能如下所示:
import numpy as np
N = 10
M = 12
results = np.zeros(shape=(N,M))
for x in range(N):
for y in range(M):
results[x,y] = f(x,y)
不确定您的情况是否需要多处理。在上面的简单例子中,你可以做
X, Y = numpy.meshgrid(numpy.arange(10), numpy.arange(12))
result = X*Y
以下是使用 multiprocesssing
并行化示例函数的方法。我还包含了一个几乎完全相同的纯 Python 函数,它使用非并行 for
循环,以及一个实现相同结果的 numpy 单行函数:
import numpy as np
from multiprocessing import Pool
def f(x,y):
return x * y
# this helper function is needed because map() can only be used for functions
# that take a single argument (see
def splat_f(args):
return f(*args)
# a pool of 8 worker processes
pool = Pool(8)
def parallel(M, N):
results = pool.map(splat_f, ((i, j) for i in range(M) for j in range(N)))
return np.array(results).reshape(M, N)
def nonparallel(M, N):
out = np.zeros((M, N), np.int)
for i in range(M):
for j in range(N):
out[i, j] = f(i, j)
return out
def broadcast(M, N):
return np.prod(np.ogrid[:M, :N])
现在让我们看看性能:
%timeit parallel(1000, 1000)
# 1 loops, best of 3: 1.67 s per loop
%timeit nonparallel(1000, 1000)
# 1 loops, best of 3: 395 ms per loop
%timeit broadcast(1000, 1000)
# 100 loops, best of 3: 2 ms per loop
非并行纯 Python 版本比并行版本高出大约 4 倍,使用 numpy 数组广播的版本绝对 碾压 其他两个。
问题在于启动和停止 Python 子进程会带来大量开销,而且您的测试函数是如此微不足道,以至于每个工作线程只花费其生命周期的一小部分时间来做有用的工作。只有当每个线程在被杀死之前都有大量工作要做时,多处理才有意义。例如,您可以给每个工作人员一个更大的输出数组块来计算(尝试将 chunksize=
参数弄乱到 pool.map()
),但是对于这样一个微不足道的例子,我怀疑您会看到一个很大的进步。
我不知道您的实际代码是什么样的 - 也许您的函数足够大且昂贵,足以保证使用多处理。但是,我敢打赌有 很多 更好的方法来提高其性能。
假设我有以下功能:
def f(x,y):
return x*y
如何使用多处理模块将函数应用于 NxM 二维 numpy 数组中的每个元素?使用串行迭代,代码可能如下所示:
import numpy as np
N = 10
M = 12
results = np.zeros(shape=(N,M))
for x in range(N):
for y in range(M):
results[x,y] = f(x,y)
不确定您的情况是否需要多处理。在上面的简单例子中,你可以做
X, Y = numpy.meshgrid(numpy.arange(10), numpy.arange(12))
result = X*Y
以下是使用 multiprocesssing
并行化示例函数的方法。我还包含了一个几乎完全相同的纯 Python 函数,它使用非并行 for
循环,以及一个实现相同结果的 numpy 单行函数:
import numpy as np
from multiprocessing import Pool
def f(x,y):
return x * y
# this helper function is needed because map() can only be used for functions
# that take a single argument (see
def splat_f(args):
return f(*args)
# a pool of 8 worker processes
pool = Pool(8)
def parallel(M, N):
results = pool.map(splat_f, ((i, j) for i in range(M) for j in range(N)))
return np.array(results).reshape(M, N)
def nonparallel(M, N):
out = np.zeros((M, N), np.int)
for i in range(M):
for j in range(N):
out[i, j] = f(i, j)
return out
def broadcast(M, N):
return np.prod(np.ogrid[:M, :N])
现在让我们看看性能:
%timeit parallel(1000, 1000)
# 1 loops, best of 3: 1.67 s per loop
%timeit nonparallel(1000, 1000)
# 1 loops, best of 3: 395 ms per loop
%timeit broadcast(1000, 1000)
# 100 loops, best of 3: 2 ms per loop
非并行纯 Python 版本比并行版本高出大约 4 倍,使用 numpy 数组广播的版本绝对 碾压 其他两个。
问题在于启动和停止 Python 子进程会带来大量开销,而且您的测试函数是如此微不足道,以至于每个工作线程只花费其生命周期的一小部分时间来做有用的工作。只有当每个线程在被杀死之前都有大量工作要做时,多处理才有意义。例如,您可以给每个工作人员一个更大的输出数组块来计算(尝试将 chunksize=
参数弄乱到 pool.map()
),但是对于这样一个微不足道的例子,我怀疑您会看到一个很大的进步。
我不知道您的实际代码是什么样的 - 也许您的函数足够大且昂贵,足以保证使用多处理。但是,我敢打赌有 很多 更好的方法来提高其性能。