当 Numpy 数组只包含 upper/lower 三角形中的值时,有没有办法加快它们的计算速度?

Is there a way to speed up Numpy array calculations when they only contain values in upper/lower triangle?

我正在做一些只涉及矩阵上三角中的值的矩阵计算 (2d)。

到目前为止,我发现使用 Numpy 的 triu 方法 ("return a copy of a matrix with the elements below the k-th diagonal zeroed") 可以工作并且速度非常快。但据推测,计算仍在对整个矩阵进行,包括对零点的不必要计算。或者他们是?...

这是我首先尝试的示例:

# Initialize vars
N = 160
u = np.empty(N) 
u[0] = 1000
u[1:] = np.cumprod(np.full(N-1, 1/2**(1/16)))*1000
m = np.random.random(N)

def method1():
    # Prepare matrices with values only in upper triangle
    ones_ut = np.triu(np.ones((N, N)))
    u_ut = np.triu(np.broadcast_to(u, (N, N)))
    m_ut = np.triu(np.broadcast_to(m, (N, N)))

    # Do calculation
    return (ones_ut - np.divide(u_ut, u.reshape(N, 1)))**3*m_ut

然后我意识到我只需要将最终结果矩阵归零即可:

def method2():
    return np.triu((np.ones((N, N)) - np.divide(u, u.reshape(N, 1)))**3*m)
assert np.array_equal(method1(), method2())

但令我惊讶的是,这更慢了。

In [62]: %timeit method1()                                                                                
662 µs ± 3.65 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [63]: %timeit method2()                                                                                
836 µs ± 3.74 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

numpy 知道矩阵包含半个零时是否会进行某种特殊优化?

我很好奇为什么它变慢了,但实际上我的主要问题是,有没有一种方法可以通过考虑到您对矩阵中一半的值不感兴趣这一事实来加速矢量化计算?

更新

我尝试只对矩阵的 3 个象限进行计算,但与方法 1 相比没有任何速度提升:

def method4(): 
    split = N//2 
    x = np.zeros((N, N)) 
    u_mat = 1 - u/u.reshape(N, 1) 
    x[:split, :] = u_mat[:split,:]**3*m 
    x[split:, split:] = u_mat[split:, split:]**3*m[split:] 
    return np.triu(x) 
assert np.array_equal(method1(), method4())                                                  
In [86]: %timeit method4()                                                                                
683 µs ± 1.99 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

但这比方法2快

我们应该简化那里的事情,以便在最少的地方利用广播。在此基础上,我们最终会得到这样的结果,使用 um 直接获得最终输出,就像这样 -

np.triu((1-u/u.reshape(N, 1))**3*m)

然后,我们可以利用 numexpr module,它在处理超越运算时表现明显更好,就像这里的情况一样,而且内存效率很高。因此,移植到 numexpr 版本后,它将是 -

import numexpr as ne

np.triu(ne.evaluate('(1-u/u2D)**3*m',{'u2D':u.reshape(N, 1)}))

evaluate 方法中引入屏蔽部分以进一步执行。提升 -

M = np.tri(N,dtype=bool)
ne.evaluate('(1-M)*(1-u/u2D)**3*m',{'u2D':u.reshape(N, 1)})

给定数据集的时间 -

In [25]: %timeit method1()
1000 loops, best of 3: 521 µs per loop

In [26]: %timeit method2()
1000 loops, best of 3: 417 µs per loop

In [27]: %timeit np.triu((1-u/u.reshape(N, 1))**3*m)
1000 loops, best of 3: 408 µs per loop

In [28]: %timeit np.triu(ne.evaluate('(1-u/u2D)**3*m',{'u2D':u.reshape(N, 1)}))
10000 loops, best of 3: 159 µs per loop

In [29]: %timeit ne.evaluate('(1-M)*(1-u/u2D)**3*m',{'u2D':u.reshape(N, 1),'M':np.tri(N,dtype=bool)})
10000 loops, best of 3: 110 µs per loop

请注意,将 u 扩展到 2D 版本的另一种方法是使用 np.newaxis/None,这将是惯用的方法。因此,u.reshape(N, 1) 可以替换为 u[:,None]。不过,这不应该改变时间。

我想答案可能很简单。只需将零放在您不想计算的单元格中,整体计算就会更快。我认为这可以解释为什么 method1()method2().

这里有一些测试来说明这一点。

In [29]: size = (160, 160)                                                      

In [30]: z = np.zeros(size)                                                     

In [31]: r = np.random.random(size) + 1                                         

In [32]: t = np.triu(r)                                                         

In [33]: w = np.ones(size)                                                      

In [34]: %timeit z**3                                                           
177 µs ± 1.06 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [35]: %timeit t**3                                                           
376 µs ± 2.28 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [36]: %timeit r**3                                                           
572 µs ± 1.91 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [37]: %timeit w**3                                                           
138 µs ± 548 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [38]: %timeit np.triu(r)**3                                                  
427 µs ± 3.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [39]: %timeit np.triu(r**3)                                                  
625 µs ± 3.87 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

不确定这一切在低水平上是如何工作的,但很明显,零或一的幂比任何其他值的计算时间都少得多。

也很有趣。使用 numexpr 计算没有区别。

In [42]: %timeit ne.evaluate("r**3")                                            
79.2 µs ± 1.32 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [43]: %timeit ne.evaluate("z**3")                                            
79.3 µs ± 1.34 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

所以,我认为不使用 numexpr 最快的可能是这样:

def method5(): 
    return np.triu(1 - u/u[:, None])**3*m  
assert np.array_equal(method1(), method5())                            
In [65]: %timeit method1()                                                      
656 µs ± 2.78 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [66]: %timeit method5()                                                      
587 µs ± 5.05 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

或者,如果你真的在追每一个 micro-second:

def method4b():  
    split = N//2  
    x = np.zeros((N, N))  
    u_mat = np.triu(1 - u/u.reshape(N, 1))  
    x[:split, :] = u_mat[:split,:]**3*m  
    x[split:, split:] = u_mat[split:, split:]**3*m[split:]  
    return x 
assert np.array_equal(method1(), method4b())                           
In [71]: %timeit method4b()                                                     
543 µs ± 3.57 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [72]: %timeit method4b()                                                     
533 µs ± 7.43 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

@Divakar 使用 numexpr 的回答总体上是最快的。

更新

感谢@GZ0 的评论,如果你只需要提升到 3 的幂,这样会更快:

def method6(): 
    a = np.triu(1 - u/u[:, None]) 
    return a*a*a*m 
assert np.isclose(method1(), method6()).all()                          

(但我注意到精度略有下降)。

In [84]: %timeit method6()                                                      
195 µs ± 609 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

事实上,它与@Divakar 的回答中的 numexpr 方法相差不远(在我的机器上为 185/163 µs)。

这是另一种解决方案,在某些情况下速度更快,但在其他情况下速度更慢。

idx = np.triu_indices(N)

def my_method():
    result = np.zeros((N, N))
    t = 1 - u[idx[1]] / u[idx[0]]
    result[idx] = t * t * t * m[idx[1]]
    return result

此处,仅针对(扁平化的)上三角形中的元素进行计算。但是,2D-index-based赋值操作result[idx] = ...有开销。因此,当开销小于节省的计算量时,该方法会更快——这发生在 N 较小或计算相对复杂时(例如,使用 t ** 3 而不是 t * t * t)。

该方法的另一种变体是使用 1D 索引进行赋值操作,这会导致小幅加速。

idx = np.triu_indices(N)
raveled_idx = np.ravel_multi_index(idx, (N, N))
def my_method2():
    result = np.zeros((N, N))
    t = 1 - u[idx[1]] / u[idx[0]]
    result.ravel()[raveled_idx] = t * t * t * m[idx[1]]
    return result

以下是性能测试的结果。请注意,idxraveled_idxN 是固定的,并且不会随着 um 而改变(只要它们的形状保持不变)。因此,可以预先计算它们的值,并且将时间排除在测试之外。 (如果您需要使用许多不同大小的矩阵调用这些方法,则会在 idxraveled_idx 的计算中增加开销。)对于比较,method4bmethod5method6 无法从任何预计算中受益。对于method_ne,预计算M = np.tri(N, dtype=bool)也被排除在测试之外。

%timeit method4b()
%timeit method5()
%timeit method6()
%timeit method_ne()
%timeit my_method()
%timeit my_method2()

结果(N = 160):

1.54 ms ± 7.15 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
1.63 ms ± 11.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
167 µs ± 15.8 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
255 µs ± 14.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
233 µs ± 1.95 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
177 µs ± 907 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

对于 N = 32:

89.9 µs ± 880 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
84 µs ± 728 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
25.2 µs ± 223 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
28.6 µs ± 4.68 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
17.6 µs ± 1.56 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
14.3 µs ± 52.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

对于 N = 1000:

70.7 ms ± 871 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
65.1 ms ± 1.9 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
21.4 ms ± 642 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
3.03 ms ± 342 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
15.2 ms ± 95.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
12.7 ms ± 217 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

my_methodmy_method2 (N = 160) 中使用 t ** 3 而不是 t * t * t

1.53 ms ± 14.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
1.6 ms ± 13.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
156 µs ± 1.62 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
235 µs ± 8.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
1.4 ms ± 4.78 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
1.32 ms ± 9.07 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

在这里,my_methodmy_method2method4bmethod5 稍微好一点。