Python 多维数组的快速数组乘法

Python fast array multiplication for multidimensional arrays

我有两个三维数组,AB,其中

  1. A 的尺寸为 (500 x 500 x 80),并且
  2. B 的尺寸为 (500 x 80 x 2000)。

在两个数组中,大小为 80 的维度可以称为 'time'(例如 80 个时间点 i)。大小为 2000 的维度可以称为 'scenario'(我们有 2000 scenarios)。

我需要做的是取 500 x 500 矩阵 A[:, :, i] 并在每个 scenario 和时间的相应时间 B[:, i, scenario] 乘以每个 500 元素列向量i.

我最终得到了下面的代码

from scipy.stats import norm
import numpy as np
A = norm.rvs(size = (500, 500, 80),  random_state = 0)
B = norm.rvs(size = (500, 80, 2000), random_state = 0)
result = np.einsum('ijk,jkl->ikl', A, B, optimize=True)

虽然对于同样的问题,一个天真的方法是使用嵌套的 for 循环

for scenario in range(2000):
    for i in range(80):
         out[:, i, scenario] = A[:, :, i] @ B[:, i, scenario]

我预计 einsum 会很快,因为问题 'only' 涉及对大型数组的简单操作,但它实际上运行了几分钟。

我把上面einsum的速度和我们假设A中的每个矩阵都一样的情况进行了比较,我们可以保持 A 作为一个 (500 x 500) 矩阵(而不是 3d 数组),然后整个问题可以写成

A = norm.rvs(size = (500, 500),      random_state = 0)
B = norm.rvs(size = (500, 80, 2000), random_state = 0)
result = np.einsum('ij,jkl->ikl', A, B, optimize=True)

这很快,只运行了几秒钟。比上面 'slightly' 更一般的情况快得多。

我的问题是 - 我是否以计算效率高的形式用慢速 einsum 编写一般情况?

你可以做得比现有的两个嵌套循环一个一个循环更好-

m = A.shape[0]
n = B.shape[2]
r = A.shape[2]
out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
for i in range(r):
    out1[:,i,:] = A[:, :, i] @ B[:, i,:]

或者,np.matmul/@ operator -

out = (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)

这两个似乎比 einsum 版本更好。

时间

案例 #1:缩放 1/4 尺寸

In [44]: m = 500
    ...: n = 2000
    ...: r = 80
    ...: m,n,r = m//4, n//4, r//4
    ...: 
    ...: A = norm.rvs(size = (m, m, r),  random_state = 0)
    ...: B = norm.rvs(size = (m, r, n), random_state = 0)

In [45]: %%timeit
    ...: out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
    ...: for i in range(r):
    ...:     out1[:,i,:] = A[:, :, i] @ B[:, i,:]
175 ms ± 6.54 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [46]: %timeit (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
165 ms ± 1.11 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [47]: %timeit np.einsum('ijk,jkl->ikl', A, B, optimize=True)
483 ms ± 13.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

随着我们扩大规模,内存拥塞将开始有利于 one-loop 版本。

案例 #2:缩放 1/2 尺寸

In [48]: m = 500
    ...: n = 2000
    ...: r = 80
    ...: m,n,r = m//2, n//2, r//2
    ...: 
    ...: A = norm.rvs(size = (m, m, r),  random_state = 0)
    ...: B = norm.rvs(size = (m, r, n), random_state = 0)

In [49]: %%timeit
    ...: out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
    ...: for i in range(r):
    ...:     out1[:,i,:] = A[:, :, i] @ B[:, i,:]
2.9 s ± 58.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [50]: %timeit (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
3.02 s ± 94.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

案例 #3:缩放 67% 尺寸

In [59]: m = 500
    ...: n = 2000
    ...: r = 80
    ...: m,n,r = int(m/1.5), int(n/1.5), int(r/1.5)

In [60]: A = norm.rvs(size = (m, m, r),  random_state = 0)
    ...: B = norm.rvs(size = (m, r, n), random_state = 0)

In [61]: %%timeit
    ...: out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
    ...: for i in range(r):
    ...:     out1[:,i,:] = A[:, :, i] @ B[:, i,:]
25.8 s ± 4.9 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [62]: %timeit (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
29.2 s ± 2.41 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

Numba spin-off

from numba import njit, prange
    
@njit(parallel=True)
def func1(A, B):
    m = A.shape[0]
    n = B.shape[2]
    r = A.shape[2]
    out = np.empty((m,r,n))
    for i in prange(r):
        out[:,i,:] = A[:, :, i] @ B[:, i,:]
    return out

案例 #3 的时间 -

In [80]: m = 500
    ...: n = 2000
    ...: r = 80
    ...: m,n,r = int(m/1.5), int(n/1.5), int(r/1.5)

In [81]: A = norm.rvs(size = (m, m, r),  random_state = 0)
    ...: B = norm.rvs(size = (m, r, n), random_state = 0)

In [82]: %timeit func1(A, B)
653 ms ± 10.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)