向量化 3D 数组的 NumPy 协方差
Vectorizing NumPy covariance for 3D array
我有一个形状为 (t, n1, n2)
:
的 3D numpy 数组
x = np.random.rand(10, 2, 4)
我需要计算另一个 3D 数组 y
,其形状为 (t, n1, n1)
,使得:
y[0] = np.cov(x[0,:,:])
...沿第一个轴的所有切片依此类推。
因此,一个循环实现将是:
y = np.zeros((10,2,2))
for i in np.arange(x.shape[0]):
y[i] = np.cov(x[i, :, :])
有什么方法可以对其进行矢量化,以便我可以一次性计算出所有协方差矩阵?我尝试这样做:
x1 = x.swapaxes(1, 2)
y = np.dot(x, x1)
但是没有用。
入侵 numpy.cov source code
并尝试使用默认参数。事实证明,np.cov(x[i,:,:])
只是 :
N = x.shape[2]
m = x[i,:,:]
m -= np.sum(m, axis=1, keepdims=True) / N
cov = np.dot(m, m.T) /(N - 1)
因此,任务是向量化此循环,该循环将遍历 i
并一次处理来自 x
的所有数据。同样,我们可以在第三步使用 broadcasting
。对于最后一步,我们将沿着第一个轴上的所有切片执行 sum-reduction
。这可以通过 np.einsum
以矢量化的方式有效地实现。于是,最终的实现就变成了这个-
N = x.shape[2]
m1 = x - x.sum(2,keepdims=1)/N
y_out = np.einsum('ijk,ilk->ijl',m1,m1) /(N - 1)
运行时测试
In [155]: def original_app(x):
...: n = x.shape[0]
...: y = np.zeros((n,2,2))
...: for i in np.arange(x.shape[0]):
...: y[i]=np.cov(x[i,:,:])
...: return y
...:
...: def proposed_app(x):
...: N = x.shape[2]
...: m1 = x - x.sum(2,keepdims=1)/N
...: out = np.einsum('ijk,ilk->ijl',m1,m1) / (N - 1)
...: return out
...:
In [156]: # Setup inputs
...: n = 10000
...: x = np.random.rand(n,2,4)
...:
In [157]: np.allclose(original_app(x),proposed_app(x))
Out[157]: True # Results verified
In [158]: %timeit original_app(x)
1 loops, best of 3: 610 ms per loop
In [159]: %timeit proposed_app(x)
100 loops, best of 3: 6.32 ms per loop
那里有巨大的加速!
我有一个形状为 (t, n1, n2)
:
x = np.random.rand(10, 2, 4)
我需要计算另一个 3D 数组 y
,其形状为 (t, n1, n1)
,使得:
y[0] = np.cov(x[0,:,:])
...沿第一个轴的所有切片依此类推。
因此,一个循环实现将是:
y = np.zeros((10,2,2))
for i in np.arange(x.shape[0]):
y[i] = np.cov(x[i, :, :])
有什么方法可以对其进行矢量化,以便我可以一次性计算出所有协方差矩阵?我尝试这样做:
x1 = x.swapaxes(1, 2)
y = np.dot(x, x1)
但是没有用。
入侵 numpy.cov source code
并尝试使用默认参数。事实证明,np.cov(x[i,:,:])
只是 :
N = x.shape[2]
m = x[i,:,:]
m -= np.sum(m, axis=1, keepdims=True) / N
cov = np.dot(m, m.T) /(N - 1)
因此,任务是向量化此循环,该循环将遍历 i
并一次处理来自 x
的所有数据。同样,我们可以在第三步使用 broadcasting
。对于最后一步,我们将沿着第一个轴上的所有切片执行 sum-reduction
。这可以通过 np.einsum
以矢量化的方式有效地实现。于是,最终的实现就变成了这个-
N = x.shape[2]
m1 = x - x.sum(2,keepdims=1)/N
y_out = np.einsum('ijk,ilk->ijl',m1,m1) /(N - 1)
运行时测试
In [155]: def original_app(x):
...: n = x.shape[0]
...: y = np.zeros((n,2,2))
...: for i in np.arange(x.shape[0]):
...: y[i]=np.cov(x[i,:,:])
...: return y
...:
...: def proposed_app(x):
...: N = x.shape[2]
...: m1 = x - x.sum(2,keepdims=1)/N
...: out = np.einsum('ijk,ilk->ijl',m1,m1) / (N - 1)
...: return out
...:
In [156]: # Setup inputs
...: n = 10000
...: x = np.random.rand(n,2,4)
...:
In [157]: np.allclose(original_app(x),proposed_app(x))
Out[157]: True # Results verified
In [158]: %timeit original_app(x)
1 loops, best of 3: 610 ms per loop
In [159]: %timeit proposed_app(x)
100 loops, best of 3: 6.32 ms per loop
那里有巨大的加速!