带掩码的 numpy 数组的邻居总和

Neighbors sum of numpy array with mask

我有两个大数组,一个包含值,一个基本上是掩码。下面的代码显示了我要实现的功能。

from scipy.signal import convolve2d
import numpy as np

sample = np.array([[6, 4, 5, 5, 5],
                   [7, 1, 0, 8, 3],
                   [2, 5, 4, 8, 4],
                   [2, 0, 2, 6, 0],
                   [5, 7, 2, 3, 2]])

mask = np.array([[1, 0, 1, 1, 0],
                 [0, 0, 1, 0, 1],
                 [0, 1, 0, 0, 0],
                 [0, 0, 0, 1, 0],
                 [1, 1, 0, 0, 1]])

neighbors_sum = convolve2d(sample, np.ones((3,3), dtype=int), mode='same', boundary='wrap')

# neighbors_sum = np.array([[40, 37, 35, 33, 44],
#                           [37, 34, 40, 42, 48],
#                           [24, 23, 34, 35, 40],
#                           [27, 29, 37, 31, 32],
#                           [31, 33, 34, 30, 34]])

result = np.where(mask, neighbors_sum, 0)

print(result)

这段代码有效,得到了我期望的结果:

np.array([[40,  0, 35, 33,  0],
          [ 0,  0, 40,  0, 48],
          [ 0, 23,  0,  0,  0],
          [ 0,  0,  0, 31,  0],
          [31, 33,  0,  0, 34]])

到目前为止,还不错。但是,当我增加数组的大小时,我遇到了一些大问题。在我的例子中,我需要一个 50,000x20,000 输入和一个 100x100 求和掩码,而不是 5x5 输入和 3x3 求和掩码。当我转向那个时,convolve2d 函数遇到了各种麻烦并且计算非常长。

考虑到我只关心屏蔽结果,因此只关心 convolve2d 在这些点上的求和,有人能想出一个聪明的方法吗?转到 for 循环并仅选择感兴趣的点会失去矢量化的速度优势,所以我不认为这是值得的。

欢迎提出任何建议!

convolve2d 在这种情况下效率很低。由于掩码是 np.ones,由于 separable filtering,您可以将过滤器分成两个简单的过滤器:一个 np.ones(100, 1) 过滤器和一个 np.ones(1, 100) 过滤器。此外,rolling sum 可用于进一步加速计算。

这是一个没有滚动总和的简单解决方案:

# Simple faster implementation

tmp = convolve2d(sample, np.ones((1,100), dtype=int), mode='same', boundary='wrap')
neighbors_sum = convolve2d(tmp, np.ones((100,1), dtype=int), mode='same', boundary='wrap')
result = np.where(mask, neighbors_sum, 0)

您可以使用 Numba 有效地计算滚动总和。该策略是将计算分为 3 个部分:水平滚动和、垂直滚动和和最终掩码。每个步骤都可以使用 多线程 完全并行化(尽管使用 Numba 并行化垂直滚动和更难)。每个部分都需要逐行工作,以便缓存友好

# Complex very-fast implementation

import numba as nb

# Numerical results may diverge if the input contains big 
# values with many small ones.
# Does not support inputs containing NaN values or +/- Inf ones.
@nb.njit('float64[:,::1](float64[:,::1], int_)', parallel=True, fastmath=True)
def horizontalRollingSum(sample, filterSize):
    n, m = sample.shape
    fs = filterSize

    # Make the wrapping part of the rolling sum much simpler
    assert fs >= 1
    assert n >= fs and m >= fs

    # Horizontal rolling sum.
    tmp = np.empty((n, m), dtype=np.float64)
    for i in nb.prange(n):
        s = 0.0
        lShift = fs//2
        rShift = (fs-1)//2
        for j in range(m-lShift, m):
            s += sample[i, j]
        for j in range(0, rShift+1):
            s += sample[i, j]
        tmp[i, 0] = s
        for j in range(1, m):
            jLeft, jRight = (j-1-lShift)%m, (j+rShift)%m
            s += sample[i, jRight] - sample[i, jLeft]
            tmp[i, j] = s

    return tmp

@nb.njit('float64[:,::1](float64[:,::1], int_)', fastmath=True)
def verticaltalRollingSum(sample, filterSize):
    n, m = sample.shape
    fs = filterSize

    # Make the wrapping part of the rolling sum much simpler
    assert fs >= 1
    assert n >= fs and m >= fs

    # Horizontal rolling sum.
    tmp = np.empty((n, m), dtype=np.float64)
    tShift = fs//2
    bShift = (fs-1)//2
    for j in range(m):
        tmp[0, j] = 0.0
    for i in range(n-tShift, n):
        for j in range(m):
            tmp[0, j] += sample[i, j]
    for i in range(0, bShift+1):
        for j in range(m):
            tmp[0, j] += sample[i, j]
    for i in range(1, n):
        iTop = (i-1-tShift)%n
        iBot = (i+bShift)%n
        for j in range(m):
            tmp[i, j] = tmp[i-1, j] + (sample[iBot, j] - sample[iTop, j])

    return tmp

@nb.njit('float64[:,::1](float64[:,::1], int_[:,::1], int_)', parallel=True, fastmath=True)
def compute(sample, mask, filterSize):
    n, m = sample.shape
    tmp = horizontalRollingSum(sample, filterSize)
    neighbors_sum = verticaltalRollingSum(tmp, filterSize)
    res = np.empty((n, m), dtype=np.float64)

    for i in nb.prange(n):
        for j in range(n):
            res[i, j] = neighbors_sum[i, j] * mask[i, j]

    return res

基准和注释

测试代码如下:

n, m = 5000, 2000
sample = np.random.rand(n, m)
mask = (np.random.rand(n, m) < 0.05).astype(int)

以下是我的 6 核机器上的结果:

Initial solution:           174366 ms   (x1)
With separate filters:        5710 ms   (x31)
Final Numba solution:           40 ms   (x4359)
Optimal theoretical time:       10 ms   (optimistic)

因此,Numba 实施比初始实施快 4359 倍

话虽这么说,但请注意最后一个实现在输入数组方面可能存在的数值问题(请参阅代码中的注释)。只要np.std(sample)比较小,np.all(np.isfinite(sample))为真就可以了。

注意代码可以进一步优化:垂直滚动求和可以并行化;横向滚动求和可以避免模数运算;垂直滚动总和和屏蔽步骤可以合并在一起(即通过计算 res on-the-fly 而不存储 tmp);平铺可用于以更 cache-friendly 的方式同时计算所有步骤。但是,这些优化使代码更加复杂,其中一些非常难以执行(尤其是最后一个与 Numba 的优化)。

请注意,使用 布尔掩码 (而不是 integer-based)应该会使算法更快,因为它占用更少的内存并且处理器可以更快地获取值。