Python 多维数组的快速数组乘法
Python fast array multiplication for multidimensional arrays
我有两个三维数组,A,B,其中
- A 的尺寸为 (500 x 500 x 80),并且
- B 的尺寸为 (500 x 80 x 2000)。
在两个数组中,大小为 80 的维度可以称为 'time'(例如 80 个时间点 i
)。大小为 2000 的维度可以称为 'scenario'(我们有 2000 scenario
s)。
我需要做的是取 500 x 500 矩阵 A[:, :, i]
并在每个 scenario
和时间的相应时间 B[:, i, scenario]
乘以每个 500 元素列向量i
.
我最终得到了下面的代码
from scipy.stats import norm
import numpy as np
A = norm.rvs(size = (500, 500, 80), random_state = 0)
B = norm.rvs(size = (500, 80, 2000), random_state = 0)
result = np.einsum('ijk,jkl->ikl', A, B, optimize=True)
虽然对于同样的问题,一个天真的方法是使用嵌套的 for 循环
for scenario in range(2000):
for i in range(80):
out[:, i, scenario] = A[:, :, i] @ B[:, i, scenario]
我预计 einsum
会很快,因为问题 'only' 涉及对大型数组的简单操作,但它实际上运行了几分钟。
我把上面einsum
的速度和我们假设A中的每个矩阵都一样的情况进行了比较,我们可以保持 A 作为一个 (500 x 500) 矩阵(而不是 3d 数组),然后整个问题可以写成
A = norm.rvs(size = (500, 500), random_state = 0)
B = norm.rvs(size = (500, 80, 2000), random_state = 0)
result = np.einsum('ij,jkl->ikl', A, B, optimize=True)
这很快,只运行了几秒钟。比上面 'slightly' 更一般的情况快得多。
我的问题是 - 我是否以计算效率高的形式用慢速 einsum
编写一般情况?
你可以做得比现有的两个嵌套循环一个一个循环更好-
m = A.shape[0]
n = B.shape[2]
r = A.shape[2]
out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
for i in range(r):
out1[:,i,:] = A[:, :, i] @ B[:, i,:]
或者,np.matmul/@ operator
-
out = (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
这两个似乎比 einsum
版本更好。
时间
案例 #1:缩放 1/4 尺寸
In [44]: m = 500
...: n = 2000
...: r = 80
...: m,n,r = m//4, n//4, r//4
...:
...: A = norm.rvs(size = (m, m, r), random_state = 0)
...: B = norm.rvs(size = (m, r, n), random_state = 0)
In [45]: %%timeit
...: out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
...: for i in range(r):
...: out1[:,i,:] = A[:, :, i] @ B[:, i,:]
175 ms ± 6.54 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [46]: %timeit (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
165 ms ± 1.11 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [47]: %timeit np.einsum('ijk,jkl->ikl', A, B, optimize=True)
483 ms ± 13.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
随着我们扩大规模,内存拥塞将开始有利于 one-loop 版本。
案例 #2:缩放 1/2 尺寸
In [48]: m = 500
...: n = 2000
...: r = 80
...: m,n,r = m//2, n//2, r//2
...:
...: A = norm.rvs(size = (m, m, r), random_state = 0)
...: B = norm.rvs(size = (m, r, n), random_state = 0)
In [49]: %%timeit
...: out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
...: for i in range(r):
...: out1[:,i,:] = A[:, :, i] @ B[:, i,:]
2.9 s ± 58.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [50]: %timeit (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
3.02 s ± 94.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
案例 #3:缩放 67% 尺寸
In [59]: m = 500
...: n = 2000
...: r = 80
...: m,n,r = int(m/1.5), int(n/1.5), int(r/1.5)
In [60]: A = norm.rvs(size = (m, m, r), random_state = 0)
...: B = norm.rvs(size = (m, r, n), random_state = 0)
In [61]: %%timeit
...: out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
...: for i in range(r):
...: out1[:,i,:] = A[:, :, i] @ B[:, i,:]
25.8 s ± 4.9 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [62]: %timeit (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
29.2 s ± 2.41 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Numba spin-off
from numba import njit, prange
@njit(parallel=True)
def func1(A, B):
m = A.shape[0]
n = B.shape[2]
r = A.shape[2]
out = np.empty((m,r,n))
for i in prange(r):
out[:,i,:] = A[:, :, i] @ B[:, i,:]
return out
案例 #3 的时间 -
In [80]: m = 500
...: n = 2000
...: r = 80
...: m,n,r = int(m/1.5), int(n/1.5), int(r/1.5)
In [81]: A = norm.rvs(size = (m, m, r), random_state = 0)
...: B = norm.rvs(size = (m, r, n), random_state = 0)
In [82]: %timeit func1(A, B)
653 ms ± 10.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
我有两个三维数组,A,B,其中
- A 的尺寸为 (500 x 500 x 80),并且
- B 的尺寸为 (500 x 80 x 2000)。
在两个数组中,大小为 80 的维度可以称为 'time'(例如 80 个时间点 i
)。大小为 2000 的维度可以称为 'scenario'(我们有 2000 scenario
s)。
我需要做的是取 500 x 500 矩阵 A[:, :, i]
并在每个 scenario
和时间的相应时间 B[:, i, scenario]
乘以每个 500 元素列向量i
.
我最终得到了下面的代码
from scipy.stats import norm
import numpy as np
A = norm.rvs(size = (500, 500, 80), random_state = 0)
B = norm.rvs(size = (500, 80, 2000), random_state = 0)
result = np.einsum('ijk,jkl->ikl', A, B, optimize=True)
虽然对于同样的问题,一个天真的方法是使用嵌套的 for 循环
for scenario in range(2000):
for i in range(80):
out[:, i, scenario] = A[:, :, i] @ B[:, i, scenario]
我预计 einsum
会很快,因为问题 'only' 涉及对大型数组的简单操作,但它实际上运行了几分钟。
我把上面einsum
的速度和我们假设A中的每个矩阵都一样的情况进行了比较,我们可以保持 A 作为一个 (500 x 500) 矩阵(而不是 3d 数组),然后整个问题可以写成
A = norm.rvs(size = (500, 500), random_state = 0)
B = norm.rvs(size = (500, 80, 2000), random_state = 0)
result = np.einsum('ij,jkl->ikl', A, B, optimize=True)
这很快,只运行了几秒钟。比上面 'slightly' 更一般的情况快得多。
我的问题是 - 我是否以计算效率高的形式用慢速 einsum
编写一般情况?
你可以做得比现有的两个嵌套循环一个一个循环更好-
m = A.shape[0]
n = B.shape[2]
r = A.shape[2]
out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
for i in range(r):
out1[:,i,:] = A[:, :, i] @ B[:, i,:]
或者,np.matmul/@ operator
-
out = (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
这两个似乎比 einsum
版本更好。
时间
案例 #1:缩放 1/4 尺寸
In [44]: m = 500
...: n = 2000
...: r = 80
...: m,n,r = m//4, n//4, r//4
...:
...: A = norm.rvs(size = (m, m, r), random_state = 0)
...: B = norm.rvs(size = (m, r, n), random_state = 0)
In [45]: %%timeit
...: out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
...: for i in range(r):
...: out1[:,i,:] = A[:, :, i] @ B[:, i,:]
175 ms ± 6.54 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [46]: %timeit (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
165 ms ± 1.11 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [47]: %timeit np.einsum('ijk,jkl->ikl', A, B, optimize=True)
483 ms ± 13.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
随着我们扩大规模,内存拥塞将开始有利于 one-loop 版本。
案例 #2:缩放 1/2 尺寸
In [48]: m = 500
...: n = 2000
...: r = 80
...: m,n,r = m//2, n//2, r//2
...:
...: A = norm.rvs(size = (m, m, r), random_state = 0)
...: B = norm.rvs(size = (m, r, n), random_state = 0)
In [49]: %%timeit
...: out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
...: for i in range(r):
...: out1[:,i,:] = A[:, :, i] @ B[:, i,:]
2.9 s ± 58.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [50]: %timeit (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
3.02 s ± 94.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
案例 #3:缩放 67% 尺寸
In [59]: m = 500
...: n = 2000
...: r = 80
...: m,n,r = int(m/1.5), int(n/1.5), int(r/1.5)
In [60]: A = norm.rvs(size = (m, m, r), random_state = 0)
...: B = norm.rvs(size = (m, r, n), random_state = 0)
In [61]: %%timeit
...: out1 = np.empty((m,r,n), dtype=np.result_type(A.dtype, B.dtype))
...: for i in range(r):
...: out1[:,i,:] = A[:, :, i] @ B[:, i,:]
25.8 s ± 4.9 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [62]: %timeit (A.transpose(2,0,1) @ B.transpose(1,0,2)).swapaxes(0,1)
29.2 s ± 2.41 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Numba spin-off
from numba import njit, prange
@njit(parallel=True)
def func1(A, B):
m = A.shape[0]
n = B.shape[2]
r = A.shape[2]
out = np.empty((m,r,n))
for i in prange(r):
out[:,i,:] = A[:, :, i] @ B[:, i,:]
return out
案例 #3 的时间 -
In [80]: m = 500
...: n = 2000
...: r = 80
...: m,n,r = int(m/1.5), int(n/1.5), int(r/1.5)
In [81]: A = norm.rvs(size = (m, m, r), random_state = 0)
...: B = norm.rvs(size = (m, r, n), random_state = 0)
In [82]: %timeit func1(A, B)
653 ms ± 10.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)