带掩码的 numpy 数组的邻居总和
Neighbors sum of numpy array with mask
我有两个大数组,一个包含值,一个基本上是掩码。下面的代码显示了我要实现的功能。
from scipy.signal import convolve2d
import numpy as np
sample = np.array([[6, 4, 5, 5, 5],
[7, 1, 0, 8, 3],
[2, 5, 4, 8, 4],
[2, 0, 2, 6, 0],
[5, 7, 2, 3, 2]])
mask = np.array([[1, 0, 1, 1, 0],
[0, 0, 1, 0, 1],
[0, 1, 0, 0, 0],
[0, 0, 0, 1, 0],
[1, 1, 0, 0, 1]])
neighbors_sum = convolve2d(sample, np.ones((3,3), dtype=int), mode='same', boundary='wrap')
# neighbors_sum = np.array([[40, 37, 35, 33, 44],
# [37, 34, 40, 42, 48],
# [24, 23, 34, 35, 40],
# [27, 29, 37, 31, 32],
# [31, 33, 34, 30, 34]])
result = np.where(mask, neighbors_sum, 0)
print(result)
这段代码有效,得到了我期望的结果:
np.array([[40, 0, 35, 33, 0],
[ 0, 0, 40, 0, 48],
[ 0, 23, 0, 0, 0],
[ 0, 0, 0, 31, 0],
[31, 33, 0, 0, 34]])
到目前为止,还不错。但是,当我增加数组的大小时,我遇到了一些大问题。在我的例子中,我需要一个 50,000x20,000 输入和一个 100x100 求和掩码,而不是 5x5 输入和 3x3 求和掩码。当我转向那个时,convolve2d 函数遇到了各种麻烦并且计算非常长。
考虑到我只关心屏蔽结果,因此只关心 convolve2d 在这些点上的求和,有人能想出一个聪明的方法吗?转到 for 循环并仅选择感兴趣的点会失去矢量化的速度优势,所以我不认为这是值得的。
欢迎提出任何建议!
convolve2d
在这种情况下效率很低。由于掩码是 np.ones
,由于 separable filtering,您可以将过滤器分成两个简单的过滤器:一个 np.ones(100, 1)
过滤器和一个 np.ones(1, 100)
过滤器。此外,rolling sum 可用于进一步加速计算。
这是一个没有滚动总和的简单解决方案:
# Simple faster implementation
tmp = convolve2d(sample, np.ones((1,100), dtype=int), mode='same', boundary='wrap')
neighbors_sum = convolve2d(tmp, np.ones((100,1), dtype=int), mode='same', boundary='wrap')
result = np.where(mask, neighbors_sum, 0)
您可以使用 Numba 有效地计算滚动总和。该策略是将计算分为 3 个部分:水平滚动和、垂直滚动和和最终掩码。每个步骤都可以使用 多线程 完全并行化(尽管使用 Numba 并行化垂直滚动和更难)。每个部分都需要逐行工作,以便缓存友好。
# Complex very-fast implementation
import numba as nb
# Numerical results may diverge if the input contains big
# values with many small ones.
# Does not support inputs containing NaN values or +/- Inf ones.
@nb.njit('float64[:,::1](float64[:,::1], int_)', parallel=True, fastmath=True)
def horizontalRollingSum(sample, filterSize):
n, m = sample.shape
fs = filterSize
# Make the wrapping part of the rolling sum much simpler
assert fs >= 1
assert n >= fs and m >= fs
# Horizontal rolling sum.
tmp = np.empty((n, m), dtype=np.float64)
for i in nb.prange(n):
s = 0.0
lShift = fs//2
rShift = (fs-1)//2
for j in range(m-lShift, m):
s += sample[i, j]
for j in range(0, rShift+1):
s += sample[i, j]
tmp[i, 0] = s
for j in range(1, m):
jLeft, jRight = (j-1-lShift)%m, (j+rShift)%m
s += sample[i, jRight] - sample[i, jLeft]
tmp[i, j] = s
return tmp
@nb.njit('float64[:,::1](float64[:,::1], int_)', fastmath=True)
def verticaltalRollingSum(sample, filterSize):
n, m = sample.shape
fs = filterSize
# Make the wrapping part of the rolling sum much simpler
assert fs >= 1
assert n >= fs and m >= fs
# Horizontal rolling sum.
tmp = np.empty((n, m), dtype=np.float64)
tShift = fs//2
bShift = (fs-1)//2
for j in range(m):
tmp[0, j] = 0.0
for i in range(n-tShift, n):
for j in range(m):
tmp[0, j] += sample[i, j]
for i in range(0, bShift+1):
for j in range(m):
tmp[0, j] += sample[i, j]
for i in range(1, n):
iTop = (i-1-tShift)%n
iBot = (i+bShift)%n
for j in range(m):
tmp[i, j] = tmp[i-1, j] + (sample[iBot, j] - sample[iTop, j])
return tmp
@nb.njit('float64[:,::1](float64[:,::1], int_[:,::1], int_)', parallel=True, fastmath=True)
def compute(sample, mask, filterSize):
n, m = sample.shape
tmp = horizontalRollingSum(sample, filterSize)
neighbors_sum = verticaltalRollingSum(tmp, filterSize)
res = np.empty((n, m), dtype=np.float64)
for i in nb.prange(n):
for j in range(n):
res[i, j] = neighbors_sum[i, j] * mask[i, j]
return res
基准和注释
测试代码如下:
n, m = 5000, 2000
sample = np.random.rand(n, m)
mask = (np.random.rand(n, m) < 0.05).astype(int)
以下是我的 6 核机器上的结果:
Initial solution: 174366 ms (x1)
With separate filters: 5710 ms (x31)
Final Numba solution: 40 ms (x4359)
Optimal theoretical time: 10 ms (optimistic)
因此,Numba 实施比初始实施快 4359 倍。
话虽这么说,但请注意最后一个实现在输入数组方面可能存在的数值问题(请参阅代码中的注释)。只要np.std(sample)
比较小,np.all(np.isfinite(sample))
为真就可以了。
注意代码可以进一步优化:垂直滚动求和可以并行化;横向滚动求和可以避免模数运算;垂直滚动总和和屏蔽步骤可以合并在一起(即通过计算 res
on-the-fly 而不存储 tmp
);平铺可用于以更 cache-friendly 的方式同时计算所有步骤。但是,这些优化使代码更加复杂,其中一些非常难以执行(尤其是最后一个与 Numba 的优化)。
请注意,使用 布尔掩码 (而不是 integer-based)应该会使算法更快,因为它占用更少的内存并且处理器可以更快地获取值。
我有两个大数组,一个包含值,一个基本上是掩码。下面的代码显示了我要实现的功能。
from scipy.signal import convolve2d
import numpy as np
sample = np.array([[6, 4, 5, 5, 5],
[7, 1, 0, 8, 3],
[2, 5, 4, 8, 4],
[2, 0, 2, 6, 0],
[5, 7, 2, 3, 2]])
mask = np.array([[1, 0, 1, 1, 0],
[0, 0, 1, 0, 1],
[0, 1, 0, 0, 0],
[0, 0, 0, 1, 0],
[1, 1, 0, 0, 1]])
neighbors_sum = convolve2d(sample, np.ones((3,3), dtype=int), mode='same', boundary='wrap')
# neighbors_sum = np.array([[40, 37, 35, 33, 44],
# [37, 34, 40, 42, 48],
# [24, 23, 34, 35, 40],
# [27, 29, 37, 31, 32],
# [31, 33, 34, 30, 34]])
result = np.where(mask, neighbors_sum, 0)
print(result)
这段代码有效,得到了我期望的结果:
np.array([[40, 0, 35, 33, 0],
[ 0, 0, 40, 0, 48],
[ 0, 23, 0, 0, 0],
[ 0, 0, 0, 31, 0],
[31, 33, 0, 0, 34]])
到目前为止,还不错。但是,当我增加数组的大小时,我遇到了一些大问题。在我的例子中,我需要一个 50,000x20,000 输入和一个 100x100 求和掩码,而不是 5x5 输入和 3x3 求和掩码。当我转向那个时,convolve2d 函数遇到了各种麻烦并且计算非常长。
考虑到我只关心屏蔽结果,因此只关心 convolve2d 在这些点上的求和,有人能想出一个聪明的方法吗?转到 for 循环并仅选择感兴趣的点会失去矢量化的速度优势,所以我不认为这是值得的。
欢迎提出任何建议!
convolve2d
在这种情况下效率很低。由于掩码是 np.ones
,由于 separable filtering,您可以将过滤器分成两个简单的过滤器:一个 np.ones(100, 1)
过滤器和一个 np.ones(1, 100)
过滤器。此外,rolling sum 可用于进一步加速计算。
这是一个没有滚动总和的简单解决方案:
# Simple faster implementation
tmp = convolve2d(sample, np.ones((1,100), dtype=int), mode='same', boundary='wrap')
neighbors_sum = convolve2d(tmp, np.ones((100,1), dtype=int), mode='same', boundary='wrap')
result = np.where(mask, neighbors_sum, 0)
您可以使用 Numba 有效地计算滚动总和。该策略是将计算分为 3 个部分:水平滚动和、垂直滚动和和最终掩码。每个步骤都可以使用 多线程 完全并行化(尽管使用 Numba 并行化垂直滚动和更难)。每个部分都需要逐行工作,以便缓存友好。
# Complex very-fast implementation
import numba as nb
# Numerical results may diverge if the input contains big
# values with many small ones.
# Does not support inputs containing NaN values or +/- Inf ones.
@nb.njit('float64[:,::1](float64[:,::1], int_)', parallel=True, fastmath=True)
def horizontalRollingSum(sample, filterSize):
n, m = sample.shape
fs = filterSize
# Make the wrapping part of the rolling sum much simpler
assert fs >= 1
assert n >= fs and m >= fs
# Horizontal rolling sum.
tmp = np.empty((n, m), dtype=np.float64)
for i in nb.prange(n):
s = 0.0
lShift = fs//2
rShift = (fs-1)//2
for j in range(m-lShift, m):
s += sample[i, j]
for j in range(0, rShift+1):
s += sample[i, j]
tmp[i, 0] = s
for j in range(1, m):
jLeft, jRight = (j-1-lShift)%m, (j+rShift)%m
s += sample[i, jRight] - sample[i, jLeft]
tmp[i, j] = s
return tmp
@nb.njit('float64[:,::1](float64[:,::1], int_)', fastmath=True)
def verticaltalRollingSum(sample, filterSize):
n, m = sample.shape
fs = filterSize
# Make the wrapping part of the rolling sum much simpler
assert fs >= 1
assert n >= fs and m >= fs
# Horizontal rolling sum.
tmp = np.empty((n, m), dtype=np.float64)
tShift = fs//2
bShift = (fs-1)//2
for j in range(m):
tmp[0, j] = 0.0
for i in range(n-tShift, n):
for j in range(m):
tmp[0, j] += sample[i, j]
for i in range(0, bShift+1):
for j in range(m):
tmp[0, j] += sample[i, j]
for i in range(1, n):
iTop = (i-1-tShift)%n
iBot = (i+bShift)%n
for j in range(m):
tmp[i, j] = tmp[i-1, j] + (sample[iBot, j] - sample[iTop, j])
return tmp
@nb.njit('float64[:,::1](float64[:,::1], int_[:,::1], int_)', parallel=True, fastmath=True)
def compute(sample, mask, filterSize):
n, m = sample.shape
tmp = horizontalRollingSum(sample, filterSize)
neighbors_sum = verticaltalRollingSum(tmp, filterSize)
res = np.empty((n, m), dtype=np.float64)
for i in nb.prange(n):
for j in range(n):
res[i, j] = neighbors_sum[i, j] * mask[i, j]
return res
基准和注释
测试代码如下:
n, m = 5000, 2000
sample = np.random.rand(n, m)
mask = (np.random.rand(n, m) < 0.05).astype(int)
以下是我的 6 核机器上的结果:
Initial solution: 174366 ms (x1)
With separate filters: 5710 ms (x31)
Final Numba solution: 40 ms (x4359)
Optimal theoretical time: 10 ms (optimistic)
因此,Numba 实施比初始实施快 4359 倍。
话虽这么说,但请注意最后一个实现在输入数组方面可能存在的数值问题(请参阅代码中的注释)。只要np.std(sample)
比较小,np.all(np.isfinite(sample))
为真就可以了。
注意代码可以进一步优化:垂直滚动求和可以并行化;横向滚动求和可以避免模数运算;垂直滚动总和和屏蔽步骤可以合并在一起(即通过计算 res
on-the-fly 而不存储 tmp
);平铺可用于以更 cache-friendly 的方式同时计算所有步骤。但是,这些优化使代码更加复杂,其中一些非常难以执行(尤其是最后一个与 Numba 的优化)。
请注意,使用 布尔掩码 (而不是 integer-based)应该会使算法更快,因为它占用更少的内存并且处理器可以更快地获取值。