对滞后列求和的最快方法

Fastest way to sum columns with lag

给定一个方阵,我想按行号移动每一行并对列求和。例如:

array([[0, 1, 2],        array([[0, 1, 2],
       [3, 4, 5],    ->            [3, 4, 5],      ->   array([0, 1+3, 2+4+6, 5+7, 8]) = array([0, 4, 12, 12, 8])
       [6, 7, 8]])                    [6, 7, 8]])

我有 4 个解决方案 - , slow, slowest,它们做的事情完全相同,并按速度排名:

def fast(A):
    n = A.shape[0]
    retval = np.zeros(2*n-1)
    for i in range(n):
        retval[i:(i+n)] += A[i, :]
    return retval
def slow(A):
    n = A.shape[0]
    indices = np.arange(n)
    indices = indices + indices[:,None]
    return np.bincount(indices.ravel(), A.ravel())
def slower(A):
    r, _ = A.shape
    retval = np.c_[A, np.zeros((r, r), dtype=A.dtype)].ravel()[:-r].reshape(r, -1)
    return retval.sum(0)
def slowest(A):
    n = A.shape[0]
    retval = np.zeros(2*n-1)
    indices = np.arange(n)
    indices = indices + indices[:,None]
    np.add.at(retval, indices, A)
    return retval

令人惊讶的是,非矢量化解决方案是最快的。这是我的基准:

A = np.random.randn(1000,1000)

%timeit fast(A)
# 1.85 ms ± 20 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

%timeit slow(A)
# 3.28 ms ± 9.55 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

%timeit slower(A)
# 4.07 ms ± 18.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

%timeit slowest(A)
# 58.4 ms ± 993 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

有没有更快的解决方案?如果不是,有人可以解释为什么实际上 fast 是最快的吗?

编辑

slow 的轻微加速:

def slow(A):
    n = A.shape[0]
    indices = np.arange(2*n-1)
    indices = np.lib.stride_tricks.as_strided(indices, A.shape, (8,8))
    return np.bincount(indices.ravel(), A.ravel())

以与 Pierre 相同的方式绘制运行时(以 2**15 作为上限 - 由于某些原因 slow 不处理此大小)

对于 100x100 的数组,

slow 比任何解决方案(不使用 numba)都快一点。 sum_antidiagonals 仍然是 1000x1000 数组的最佳选择。

最简单的加速方法(在我的电脑上~2/3 倍)是使用你的 fast 方法和 numba 包:

import numba
@numba.jit(nopython=True)
def fastNumba(A):
    n = A.shape[0]
    retval = np.zeros(2*n-1)
    for i in range(n):
        retval[i:(i+n)] += A[i, :]
    return retval

但是只有当此函数多次 运行 时,使用 numba 才有意义。第一次计算函数需要更多时间(因为编译)。

这里有一种方法 有时 比你的“fast()” 版本快,但只在 n 的有限范围内(大约在 30和 1000) 对于 n x n 数组。循环 (fast()) 很难在大型数组 上击败,即使使用 numba,实际上渐近收敛到简单 a.sum(axis=0) 的时间,这表明这与大型数组的效率差不多(对您的循环表示敬意!)

我称之为 sum_antidiagonals() 的方法在 a 的条纹版本上使用 np.add.reduce() 并在来自相对较小的一维数组的合成掩码上使用 np.add.reduce()条纹以创建二维数组的幻觉(不消耗更多内存)。

此外,它不限于方形数组(但 fast() 也可以很容易地适应这种概括,请参阅此 post 底部的 fast_g())。

def sum_antidiagonals(a):
    assert a.flags.c_contiguous
    r, c = a.shape
    s0, s1 = a.strides
    z = np.lib.stride_tricks.as_strided(
        a, shape=(r, c+r-1), strides=(s0 - s1, s1), writeable=False)
    # mask
    kern = np.r_[np.repeat(False, r-1), np.repeat(True, c), np.repeat(False, r-1)]
    mask = np.fliplr(np.lib.stride_tricks.as_strided(
        kern, shape=(r, c+r-1), strides=(1, 1), writeable=False))
    return np.add.reduce(z, where=mask)

注意它不限于方阵:

>>> sum_antidiagonals(np.arange(15).reshape(5,3))
array([ 0,  4, 12, 21, 30, 24, 14])

说明

为了理解它是如何工作的,让我们用一个例子来研究这些条带化数组。

给定初始数组 a(3, 2):

a = np.arange(6).reshape(3, 2)
>>> a
array([[0, 1],
       [2, 3],
       [4, 5]])

# after calculating z in the function
>>> z
array([[0, 1, 2, 3],
       [1, 2, 3, 4],
       [2, 3, 4, 5]])

你可以看到这几乎是我们想要的 sum(axis=0),除了下三角形和上三角形是不需要的。我们真正想要总结的是:

array([[0, 1, -, -],
       [-, 2, 3, -],
       [-, -, 4, 5]])

输入掩码,我们可以从一维内核开始构建它:

kern = np.r_[np.repeat(False, r-1), np.repeat(True, c), np.repeat(False, r-1)]
>>> kern
array([False, False,  True,  True, False, False])

我们使用了一个有趣的切片:(1, 1),这意味着我们重复同一行,但每次滑动一个元素:

>>> np.lib.stride_tricks.as_strided(
...        kern, shape=(r, c+r-1), strides=(1, 1), writeable=False)
array([[False, False,  True,  True],
       [False,  True,  True, False],
       [ True,  True, False, False]])

然后我们只需翻转此 left/right,并将其用作 np.add.reduce()where 参数。

速度

b = np.random.normal(size=(1000, 1000))

# check equivalence with the OP's fast() function:
>>> np.allclose(fast(b), sum_antidiagonals(b))
True

%timeit sum_antidiagonals(b)
# 1.83 ms ± 840 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

%timeit fast(b)
# 2.07 ms ± 15.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

在这种情况下,速度稍微快了一点,但只有大约 10%。

在 300x300 阵列上,sum_antidiagonals()fast() 快 2.27 倍。

不过!

尽管将 zmask 放在一起非常快(在上面的 1000x1000 示例中 np.add.reduce() 之前的整个设置只需要 46 微秒),总和本身是 O[r (r+c)],即使只需要 O[r c] 实际添加(其中 mask == True)。因此,方形阵列的操作大约多了 2 倍。

在 10K x 10K 阵列上,这赶上了我们:

  • fast 需要 95 毫秒,而
  • sum_antidiagonals 需要 208 毫秒。

尺寸范围比较

我们将使用可爱的 perfplot 包来比较 n:

范围内的多种方法的速度
perfplot.show(
    setup=lambda n: np.random.normal(size=(n, n)),
    kernels=[just_sum_0, fast, fast_g, nb_fast_i, nb_fast_ij, sum_antidiagonals],
    n_range=[2 ** k for k in range(3, 16)],
    equality_check=None,  # because of just_sum_0
    xlabel='n',
    relative_to=1,
)

观察结果

  • 如您所见,sum_antidiagonals() 相对于 fast() 的速度优势仅限于 n 的范围,大致在 30 到 1000 之间。
  • 它永远比不上 numba 版本。
  • just_sum_0(),这只是 axis=0 的总和(因此是一个很好的底线基准,几乎不可能被超越),对于大型数组来说只是稍微快一点。这一事实表明 fast() 与大型数组的速度差不多。
  • 令人惊讶的是,numba 在达到一定大小后(即在最初几次运行以“融入”LLVM 编译之后)会有所下降。我不确定为什么会这样,但它似乎对大型阵列很重要。

其他函数的完整代码

(包括 fast 到非方形数组的简单概括)

from numba import njit

@njit
def nb_fast_ij(a):
    # numba loves loops...
    r, c = a.shape
    z = np.zeros(c + r - 1, dtype=a.dtype)
    for i in range(r):
        for j in range(c):
            z[i+j] += a[i, j]
    return z

@njit
def nb_fast_i(a):
    r, c = a.shape
    z = np.zeros(c + r - 1, dtype=a.dtype)
    for i in range(r):
        z[i:i+c] += a[i, :]
    return z

def fast_g(a):
    # generalizes fast() to non-square arrays, also returns the same dtype
    r, c = a.shape
    z = np.zeros(c + r - 1, dtype=a.dtype)
    for i in range(r):
        z[i:i+c] += a[i]
    return z

def fast(A):
    # the OP's code
    n = A.shape[0]
    retval = np.zeros(2*n-1)
    for i in range(n):
        retval[i:(i+n)] += A[i, :]
    return retval

def just_sum_0(a):
    # for benchmarking comparison
    return a.sum(axis=0)