如何使用不同大小的向量对操作进行向量化
How to vectorize operation with vectors of different size
我有不同大小的向量,想进行元素方面的操作。如何优化 Python 中的以下 for 循环? (例如 np.vectorize()
)
import numpy as np
n = 1000000
vec1 = np.random.rand(n)
vec2 = np.random.rand(3*n)
vec3 = np.random.rand(3*n)
for i in range(len(vec1)):
if vec1[i] < 0.5:
vec2[3*i : 3*(i+1)] = vec1[i]*vec3[3*i : 3*(i+1)]
else:
vec2[3*i : 3*(i+1)] = [0,0,0]
非常感谢您的帮助。
我们可以利用 broadcasting
-
v = vec3.reshape(-1,3)*vec1[:,None]
m = vec1<0.5
vec2_out = (v*m[:,None]).ravel()
另一种表达方式是 -
mask = vec1<0.5
vec2_out = (vec3.reshape(-1,3)*(vec1*mask)[:,None]).ravel()
并使用多核 numexpr module
-
import numexpr as ne
d = {'V3r':vec3.reshape(-1,3),'vec12D':vec1[:,None]}
out = ne.evaluate('V3r*vec12D*(vec12D<0.5)',d).ravel()
计时 -
In [84]: n = 1000000
...: np.random.seed(0)
...: vec1 = np.random.rand(n)
...: vec2 = np.random.rand(3*n)
...: vec3 = np.random.rand(3*n)
In [86]: %%timeit
...: v = vec3.reshape(-1,3)*vec1[:,None]
...: m = vec1<0.5
...: vec2_out = (v*m[:,None]).ravel()
10 loops, best of 3: 23.2 ms per loop
In [87]: %%timeit
...: mask = vec1<0.5
...: vec2_out = (vec3.reshape(-1,3)*(vec1*mask)[:,None]).ravel()
100 loops, best of 3: 13.1 ms per loop
In [88]: %%timeit
...: d = {'V3r':vec3.reshape(-1,3),'vec12D':vec1[:,None]}
...: out = ne.evaluate('V3r*vec12D*(vec12D<0.5)',d).ravel()
100 loops, best of 3: 4.11 ms per loop
对于一般情况,else 部分可能不是零,它将是 -
mask = vec1<0.5
IF_vals = vec3.reshape(-1,3)*vec1[:,None]
ELSE_vals = np.array([1,1,1])
out = np.where(mask[:,None],IF_vals,ELSE_vals).ravel()
numpy.vectorize
, , is for convenience, not performance, per the docs:
The vectorize
function is provided primarily for convenience, not for performance. The implementation is essentially a for loop.
实际上矢量化的一个解决方案是:
vec2[:] = vec1.repeat(3) * vec3 # Bulk compute all results
vec2[(vec1 < 0.5).repeat(3)] = 0 # Zero the results you meant to exclude
另一种方法(最大限度地减少临时变量)是过滤和重塑 vec1
,以便它可以分配给 vec2
,然后将 vec2
乘以 vec3
避免临时(超出第一步中的两个 n
长度数组),例如:
vec2.reshape(-1, 3)[:] = (vec1 * (vec1 >= 0.5)).reshape(-1, 1)
vec2 *= vec3
如果 vec1
可以修改,则可以剃掉一个额外的临时文件,简化为:
vec1 *= vec1 >= 0.5
vec2.reshape(-1, 3)[:] = vec1.reshape(-1, 1)
vec2 *= vec3
@Divakar 演示的 reshape/broadcasting 相当于将您的迭代重写为:
In [5]: n = 10
...: vec1 = np.random.rand(n)
...: vec2 = np.zeros((n,3))
...: vec3 = np.random.rand(n,3)
...:
...: for i in range(len(vec1)):
...: if vec1[i] < 0.5:
...: vec2[i,:] = vec1[i]*vec3[i,:]
...: else:
...: vec2[i,:] = 0
...:
In [6]: vec2
Out[6]:
array([[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0.119655 , 0.05079028, 0.00392748],
[0.04529872, 0.04630456, 0.01565116],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0.08361475, 0.21825921, 0.1273483 ]])
In [7]: vec1
Out[7]:
array([0.934649 , 0.85309325, 0.50775071, 0.91246865, 0.12970539,
0.13075136, 0.89861756, 0.68921343, 0.80572879, 0.25996369])
通过将 vec2
定义为 (n,3) 数组,我们将此索引 vec2[3*i : 3*(i+1)]
替换为 vec2[i,:]
或 vec2[i]
.
使用 mask
将值设置为 0 是一个很好的基本 numpy
想法。但是 ufunc
还提供了一个 where
参数,可以用作:
In [11]: vec2 = np.zeros((n,3))
In [12]: np.multiply(vec1[:,None],vec3, out=vec2, where=vec1[:,None]<0.5);
In [13]: vec2
Out[13]:
array([[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0.119655 , 0.05079028, 0.00392748],
[0.04529872, 0.04630456, 0.01565116],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0.08361475, 0.21825921, 0.1273483 ]])
此 where
需要与 out
参数结合使用,因为它只对 True 实例执行 multiply
。
我不确定它能节省多少时间。
我有不同大小的向量,想进行元素方面的操作。如何优化 Python 中的以下 for 循环? (例如 np.vectorize()
)
import numpy as np
n = 1000000
vec1 = np.random.rand(n)
vec2 = np.random.rand(3*n)
vec3 = np.random.rand(3*n)
for i in range(len(vec1)):
if vec1[i] < 0.5:
vec2[3*i : 3*(i+1)] = vec1[i]*vec3[3*i : 3*(i+1)]
else:
vec2[3*i : 3*(i+1)] = [0,0,0]
非常感谢您的帮助。
我们可以利用 broadcasting
-
v = vec3.reshape(-1,3)*vec1[:,None]
m = vec1<0.5
vec2_out = (v*m[:,None]).ravel()
另一种表达方式是 -
mask = vec1<0.5
vec2_out = (vec3.reshape(-1,3)*(vec1*mask)[:,None]).ravel()
并使用多核 numexpr module
-
import numexpr as ne
d = {'V3r':vec3.reshape(-1,3),'vec12D':vec1[:,None]}
out = ne.evaluate('V3r*vec12D*(vec12D<0.5)',d).ravel()
计时 -
In [84]: n = 1000000
...: np.random.seed(0)
...: vec1 = np.random.rand(n)
...: vec2 = np.random.rand(3*n)
...: vec3 = np.random.rand(3*n)
In [86]: %%timeit
...: v = vec3.reshape(-1,3)*vec1[:,None]
...: m = vec1<0.5
...: vec2_out = (v*m[:,None]).ravel()
10 loops, best of 3: 23.2 ms per loop
In [87]: %%timeit
...: mask = vec1<0.5
...: vec2_out = (vec3.reshape(-1,3)*(vec1*mask)[:,None]).ravel()
100 loops, best of 3: 13.1 ms per loop
In [88]: %%timeit
...: d = {'V3r':vec3.reshape(-1,3),'vec12D':vec1[:,None]}
...: out = ne.evaluate('V3r*vec12D*(vec12D<0.5)',d).ravel()
100 loops, best of 3: 4.11 ms per loop
对于一般情况,else 部分可能不是零,它将是 -
mask = vec1<0.5
IF_vals = vec3.reshape(-1,3)*vec1[:,None]
ELSE_vals = np.array([1,1,1])
out = np.where(mask[:,None],IF_vals,ELSE_vals).ravel()
numpy.vectorize
,
The
vectorize
function is provided primarily for convenience, not for performance. The implementation is essentially a for loop.
实际上矢量化的一个解决方案是:
vec2[:] = vec1.repeat(3) * vec3 # Bulk compute all results
vec2[(vec1 < 0.5).repeat(3)] = 0 # Zero the results you meant to exclude
另一种方法(最大限度地减少临时变量)是过滤和重塑 vec1
,以便它可以分配给 vec2
,然后将 vec2
乘以 vec3
避免临时(超出第一步中的两个 n
长度数组),例如:
vec2.reshape(-1, 3)[:] = (vec1 * (vec1 >= 0.5)).reshape(-1, 1)
vec2 *= vec3
如果 vec1
可以修改,则可以剃掉一个额外的临时文件,简化为:
vec1 *= vec1 >= 0.5
vec2.reshape(-1, 3)[:] = vec1.reshape(-1, 1)
vec2 *= vec3
@Divakar 演示的 reshape/broadcasting 相当于将您的迭代重写为:
In [5]: n = 10
...: vec1 = np.random.rand(n)
...: vec2 = np.zeros((n,3))
...: vec3 = np.random.rand(n,3)
...:
...: for i in range(len(vec1)):
...: if vec1[i] < 0.5:
...: vec2[i,:] = vec1[i]*vec3[i,:]
...: else:
...: vec2[i,:] = 0
...:
In [6]: vec2
Out[6]:
array([[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0.119655 , 0.05079028, 0.00392748],
[0.04529872, 0.04630456, 0.01565116],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0.08361475, 0.21825921, 0.1273483 ]])
In [7]: vec1
Out[7]:
array([0.934649 , 0.85309325, 0.50775071, 0.91246865, 0.12970539,
0.13075136, 0.89861756, 0.68921343, 0.80572879, 0.25996369])
通过将 vec2
定义为 (n,3) 数组,我们将此索引 vec2[3*i : 3*(i+1)]
替换为 vec2[i,:]
或 vec2[i]
.
使用 mask
将值设置为 0 是一个很好的基本 numpy
想法。但是 ufunc
还提供了一个 where
参数,可以用作:
In [11]: vec2 = np.zeros((n,3))
In [12]: np.multiply(vec1[:,None],vec3, out=vec2, where=vec1[:,None]<0.5);
In [13]: vec2
Out[13]:
array([[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0.119655 , 0.05079028, 0.00392748],
[0.04529872, 0.04630456, 0.01565116],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0. , 0. , 0. ],
[0.08361475, 0.21825921, 0.1273483 ]])
此 where
需要与 out
参数结合使用,因为它只对 True 实例执行 multiply
。
我不确定它能节省多少时间。