混合 Numpy 矢量化和普通 Python
Hybridizing Numpy vectorization and plain Python
我有一个二维的 ndarray
,其中的行将被扫描以检查任何一个是否与其他任何一个相等。
我的第一次尝试确实有效,但我觉得这不是最佳方式。一旦矩阵中的行数接近 1000,就需要时间。
我的代码如下。 X
就是前面提到的数组,Y
也是一个二维的ndarray
。
for i in range(X.shape[0]-1):
for j in range(i+1,X.shape[0]):
if (np.all( (X[i,:] == X[j,:] ), axis = 0 )):
Y[j,:] = Y[i,:]
#endif
#enddo
#enddo
我知道嵌套循环很耗时,应该避免,但我找不到替代方法。列表理解在我看来不合适,因为不需要保存项目。
该过程的核心是赋值操作 Y[j,:] = Y[i,:]
,它是 索引相关的 ,这一事实将导致我排除类似列表理解的解决方案.
问题 然后是:是否有更有效的方法来编写此类利用 numpy
矢量化的搜索?
看下面的例子:
例如,考虑一个 True 和 False 的一维向量,您要计算序列中“False 到 True”转换的次数:
np.random.seed(444)
x = np.random.choice([False, True], size=100000)
使用 Python for-loop,一种方法是成对评估序列中每个元素的真值及其紧随其后的元素:
def count_transitions(x) -> int:
count = 0
for i, j in zip(x[:-1], x[1:]):
if j and not i:
count += 1
return count
count_transitions(x)
在向量化形式中,没有对单个元素的显式 for-loop 或直接引用:
np.count_nonzero(x[:-1] < x[1:])
这两个等效函数在性能方面如何比较?在这种特殊情况下,矢量化 NumPy 调用胜出约 70 倍
方法 #1
我们可以利用 row-views 来获得成对匹配。然后,运行 循环并分配 Y
中的那些。我们的想法是在我们开始 运行ning 循环后尽量减少工作量。考虑到可能有多个索引与其他索引匹配,很难提出纯矢量化方法。实现看起来像这样 -
# @Divakar
def view1D(a): # a is array
a = np.ascontiguousarray(a)
void_dt = np.dtype((np.void, a.dtype.itemsize * a.shape[1]))
return a.view(void_dt).ravel()
# Get 1D view
a1D = view1D(a)
# Perform broadcasting to get outer equality match
mask = a1D[:,None]==a1D
# Get indices of pairwise matches
n = len(mask)
mask[np.tri(n, dtype=bool)] = 0
idx = np.argwhere(mask)
# Run loop to assign equal rows in Y
for (i,j) in zip(idx[:,0],idx[:,1]):
Y[j] = Y[i]
备选方案#1:使用掩码直接赋值
所以,用mask
,直接赋值Y
中的行,像这样-
for i,m in enumerate(mask):
if m.any():
Y[m] = Y[i]
如果有很多匹配项,这会很有帮助。
备选方案 #2:使用简化的掩码
如果两行之间有不止一行的公共行,我们可能希望减少这些行以将所有这些链接到第一个出现的行。因此,我们可以生成一个 reduced-mask 并使用它来代替之前的 mask
-
mask0 = np.zeros_like(mask)
mask0[mask.argmax(0), np.arange(len(mask))] = 1
np.fill_diagonal(mask0,0)
然后,使用mask0
代替mask
并赋值。
方法 #2
另一种方法是从 1D
视图方法开始,然后使用 sorting-based 方法设置成对匹配索引,就像这样 -
sidx = a1D.argsort() # a1D from earlier approach
b = a1D[sidx]
m0 = b[:-1] == b[1:]
m1 = np.r_[False,m0,False]
idx = np.flatnonzero(m1[:-1]!=m1[1:]).reshape(-1,2)
for (i,j) in idx:
row0,row1 = sidx[i],sidx[i+1:j+1]
Y[row1] = Y[row0]
我正在使用 phone,所以我无法对此进行测试,但我认为它会起作用
mask = np.all(X[:, None] == X[None], axis=-1)
ind1, ind2 = np.nonzero(mask)
ind1, ind2 = ind1[ind1 < ind2], ind2[ind1 < ind2]
Y[ind2] = Y[ind1]
我有一个二维的 ndarray
,其中的行将被扫描以检查任何一个是否与其他任何一个相等。
我的第一次尝试确实有效,但我觉得这不是最佳方式。一旦矩阵中的行数接近 1000,就需要时间。
我的代码如下。 X
就是前面提到的数组,Y
也是一个二维的ndarray
。
for i in range(X.shape[0]-1):
for j in range(i+1,X.shape[0]):
if (np.all( (X[i,:] == X[j,:] ), axis = 0 )):
Y[j,:] = Y[i,:]
#endif
#enddo
#enddo
我知道嵌套循环很耗时,应该避免,但我找不到替代方法。列表理解在我看来不合适,因为不需要保存项目。
该过程的核心是赋值操作 Y[j,:] = Y[i,:]
,它是 索引相关的 ,这一事实将导致我排除类似列表理解的解决方案.
问题 然后是:是否有更有效的方法来编写此类利用 numpy
矢量化的搜索?
看下面的例子: 例如,考虑一个 True 和 False 的一维向量,您要计算序列中“False 到 True”转换的次数:
np.random.seed(444)
x = np.random.choice([False, True], size=100000)
使用 Python for-loop,一种方法是成对评估序列中每个元素的真值及其紧随其后的元素:
def count_transitions(x) -> int:
count = 0
for i, j in zip(x[:-1], x[1:]):
if j and not i:
count += 1
return count
count_transitions(x)
在向量化形式中,没有对单个元素的显式 for-loop 或直接引用:
np.count_nonzero(x[:-1] < x[1:])
这两个等效函数在性能方面如何比较?在这种特殊情况下,矢量化 NumPy 调用胜出约 70 倍
方法 #1
我们可以利用 row-views 来获得成对匹配。然后,运行 循环并分配 Y
中的那些。我们的想法是在我们开始 运行ning 循环后尽量减少工作量。考虑到可能有多个索引与其他索引匹配,很难提出纯矢量化方法。实现看起来像这样 -
# @Divakar
def view1D(a): # a is array
a = np.ascontiguousarray(a)
void_dt = np.dtype((np.void, a.dtype.itemsize * a.shape[1]))
return a.view(void_dt).ravel()
# Get 1D view
a1D = view1D(a)
# Perform broadcasting to get outer equality match
mask = a1D[:,None]==a1D
# Get indices of pairwise matches
n = len(mask)
mask[np.tri(n, dtype=bool)] = 0
idx = np.argwhere(mask)
# Run loop to assign equal rows in Y
for (i,j) in zip(idx[:,0],idx[:,1]):
Y[j] = Y[i]
备选方案#1:使用掩码直接赋值
所以,用mask
,直接赋值Y
中的行,像这样-
for i,m in enumerate(mask):
if m.any():
Y[m] = Y[i]
如果有很多匹配项,这会很有帮助。
备选方案 #2:使用简化的掩码
如果两行之间有不止一行的公共行,我们可能希望减少这些行以将所有这些链接到第一个出现的行。因此,我们可以生成一个 reduced-mask 并使用它来代替之前的 mask
-
mask0 = np.zeros_like(mask)
mask0[mask.argmax(0), np.arange(len(mask))] = 1
np.fill_diagonal(mask0,0)
然后,使用mask0
代替mask
并赋值。
方法 #2
另一种方法是从 1D
视图方法开始,然后使用 sorting-based 方法设置成对匹配索引,就像这样 -
sidx = a1D.argsort() # a1D from earlier approach
b = a1D[sidx]
m0 = b[:-1] == b[1:]
m1 = np.r_[False,m0,False]
idx = np.flatnonzero(m1[:-1]!=m1[1:]).reshape(-1,2)
for (i,j) in idx:
row0,row1 = sidx[i],sidx[i+1:j+1]
Y[row1] = Y[row0]
我正在使用 phone,所以我无法对此进行测试,但我认为它会起作用
mask = np.all(X[:, None] == X[None], axis=-1)
ind1, ind2 = np.nonzero(mask)
ind1, ind2 = ind1[ind1 < ind2], ind2[ind1 < ind2]
Y[ind2] = Y[ind1]