Numpy:向量化 np.argwhere
Numpy: Vectorize np.argwhere
我在 numpy 中有以下数据结构:
import numpy as np
a = np.random.rand(267, 173) # dense img matrix
b = np.random.rand(199) # array of probability samples
我的目标是获取 b
中的每个条目 i
,找到 a
中所有值的 x,y coordinates/index 位置 <= i
,然后随机 select 该子集中的值之一:
from random import randint
for i in b:
l = np.argwhere(a <= i) # list of img coordinates where pixel <= i
sample = l[randint(0, len(l)-1)] # random selection from `l`
This "works",但我想向量化采样操作(即将 for
循环替换为 apply_along_axis
或类似循环)。有谁知道如何做到这一点?任何建议将不胜感激!
您不能完全矢量化 np.argmax
because you have a random subset size every time. What you can do though, is speed up the computation pretty dramatically with sorting. Sorting the image once will create a single allocation, while masking the image at every step will create a temporary array for the mask and for the extracted elements. With a sorted image, you can just apply np.searchsorted
以获得尺寸:
a_sorted = np.sort(a.ravel())
indices = np.searchsorted(a_sorted, b, side='right')
你仍然需要一个循环来做采样,但你可以做类似的事情
samples = np.array([a_sorted[np.random.randint(i)] for i in indices])
使用此系统获取 x-y 坐标而不是样本值有点复杂。你可以使用np.unravel_index
to get the indices, but first you must convert form the reference frame of a_sorted
to a.ravel()
. If you sort using np.argsort
instead of np.sort
,你可以得到原始数组中的索引。幸运的是,np.searchsorted
通过 sorter
参数支持这种情况:
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, side='right', sorter=a_ind)
r, c = np.unravel_index(a_ind[[np.random.randint(i) for i in indices]], a.shape)
r
和c
与b
大小相同,对应每个选择b
中的行列索引a
].索引转换取决于数组中的步幅,因此我们假设您使用的是 C 顺序,因为默认情况下 90% 的数组都会这样做。
复杂度
假设 b
的大小为 M
,a
的大小为 N
。
您当前的算法针对 b
的每个元素对 a
的每个元素进行线性搜索。在每次迭代中,它为匹配元素分配一个掩码(平均 N/2
),然后分配一个相同大小的缓冲区来保存掩码选择。这意味着时间复杂度在O(M * N)
数量级,space复杂度相同。
我的算法先排序a
,也就是O(N log N)
。然后搜索 M
个插入点,即 O(M log N)
。最后,它选择 M
个样本。它分配的 space 是图像的一个排序副本和两个大小为 M
的数组。因此,它的时间复杂度为 O((M + N) log N)
,space 中的时间复杂度为 O(M + N)
。
这是一种替代方法,argsorting b
,然后相应地使用 np.digitize
和 this post:
装箱 a
import numpy as np
from scipy import sparse
from timeit import timeit
import math
def h_digitize(a,bs,right=False):
mx,mn = a.max(),a.min()
asz = mx-mn
bsz = bs[-1]-bs[0]
nbins=int(bs.size*math.sqrt(bs.size)*asz/bsz)
bbs = np.concatenate([[0],((nbins-1)*(bs-mn)/asz).astype(int).clip(0,nbins),[nbins]])
bins = np.repeat(np.arange(bs.size+1), np.diff(bbs))
bbs = bbs[:bbs.searchsorted(nbins)]
bins[bbs] = -1
aidx = bins[((nbins-1)*(a-mn)/asz).astype(int)]
ambig = aidx == -1
aa = a[ambig]
if aa.size:
aidx[ambig] = np.digitize(aa,bs,right)
return aidx
def f_pp():
bo = b.argsort()
bs = b[bo]
aidx = h_digitize(a,bs,right=True).ravel()
aux = sparse.csr_matrix((aidx,aidx,np.arange(aidx.size+1)),
(aidx.size,b.size+1)).tocsc()
ridx = np.empty(b.size,int)
ridx[bo] = aux.indices[np.fromiter(map(np.random.randint,aux.indptr[1:-1].tolist()),int,b.size)]
return np.unravel_index(ridx,a.shape)
def f_mp():
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, sorter=a_ind, side='right')
return np.unravel_index(a_ind[[np.random.randint(i) for i in indices]], a.shape)
a = np.random.rand(267, 173) # dense img matrix
b = np.random.rand(199) # array of probability samples
# round to test wether equality is handled correctly
a = np.round(a,3)
b = np.round(b,3)
print('pp',timeit(f_pp, number=1000),'ms')
print('mp',timeit(f_mp, number=1000),'ms')
# sanity checks
S = np.max([a[f_pp()] for _ in range(1000)],axis=0)
T = np.max([a[f_mp()] for _ in range(1000)],axis=0)
print(f"inequality satisfied: pp {(S<=b).all()} mp {(T<=b).all()}")
print(f"largest smalles distance to boundary: pp {(b-S).max()} mp {(b-T).max()}")
print(f"equality done right: pp {not (b-S).all()} mp {not (b-T).all()}")
使用经过调整的 digitize
我会快一点,但这可能会随着问题的大小而变化。此外,@MadPhysicist 的解决方案也不那么复杂。标准 digitize
我们差不多。
pp 2.620121960993856 ms
mp 3.301037881989032 ms
inequality satisfied: pp True mp True
largest smalles distance to boundary: pp 0.0040000000000000036 mp 0.006000000000000005
equality done right: pp True mp True
@MadPhysicist 的算法略有改进,使其更加矢量化:
%%timeit
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, sorter=a_ind)
r, c = np.unravel_index(a_ind[[np.random.randint(i) for i in indices]], a.shape)
100 loops, best of 3: 6.32 ms per loop
%%timeit
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, sorter=a_ind)
r, c = np.unravel_index(a_ind[(np.random.rand(indices.size) * indices).astype(int)], a.shape)
100 loops, best of 3: 4.16 ms per loop
@PaulPanzer 的解决方案仍然占据主导地位,尽管我不确定它在缓存什么:
%timeit f_pp()
The slowest run took 14.79 times longer than the fastest. This could mean that an intermediate result is being cached.
100 loops, best of 3: 1.88 ms per loop
我在 numpy 中有以下数据结构:
import numpy as np
a = np.random.rand(267, 173) # dense img matrix
b = np.random.rand(199) # array of probability samples
我的目标是获取 b
中的每个条目 i
,找到 a
中所有值的 x,y coordinates/index 位置 <= i
,然后随机 select 该子集中的值之一:
from random import randint
for i in b:
l = np.argwhere(a <= i) # list of img coordinates where pixel <= i
sample = l[randint(0, len(l)-1)] # random selection from `l`
This "works",但我想向量化采样操作(即将 for
循环替换为 apply_along_axis
或类似循环)。有谁知道如何做到这一点?任何建议将不胜感激!
您不能完全矢量化 np.argmax
because you have a random subset size every time. What you can do though, is speed up the computation pretty dramatically with sorting. Sorting the image once will create a single allocation, while masking the image at every step will create a temporary array for the mask and for the extracted elements. With a sorted image, you can just apply np.searchsorted
以获得尺寸:
a_sorted = np.sort(a.ravel())
indices = np.searchsorted(a_sorted, b, side='right')
你仍然需要一个循环来做采样,但你可以做类似的事情
samples = np.array([a_sorted[np.random.randint(i)] for i in indices])
使用此系统获取 x-y 坐标而不是样本值有点复杂。你可以使用np.unravel_index
to get the indices, but first you must convert form the reference frame of a_sorted
to a.ravel()
. If you sort using np.argsort
instead of np.sort
,你可以得到原始数组中的索引。幸运的是,np.searchsorted
通过 sorter
参数支持这种情况:
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, side='right', sorter=a_ind)
r, c = np.unravel_index(a_ind[[np.random.randint(i) for i in indices]], a.shape)
r
和c
与b
大小相同,对应每个选择b
中的行列索引a
].索引转换取决于数组中的步幅,因此我们假设您使用的是 C 顺序,因为默认情况下 90% 的数组都会这样做。
复杂度
假设 b
的大小为 M
,a
的大小为 N
。
您当前的算法针对 b
的每个元素对 a
的每个元素进行线性搜索。在每次迭代中,它为匹配元素分配一个掩码(平均 N/2
),然后分配一个相同大小的缓冲区来保存掩码选择。这意味着时间复杂度在O(M * N)
数量级,space复杂度相同。
我的算法先排序a
,也就是O(N log N)
。然后搜索 M
个插入点,即 O(M log N)
。最后,它选择 M
个样本。它分配的 space 是图像的一个排序副本和两个大小为 M
的数组。因此,它的时间复杂度为 O((M + N) log N)
,space 中的时间复杂度为 O(M + N)
。
这是一种替代方法,argsorting b
,然后相应地使用 np.digitize
和 this post:
a
import numpy as np
from scipy import sparse
from timeit import timeit
import math
def h_digitize(a,bs,right=False):
mx,mn = a.max(),a.min()
asz = mx-mn
bsz = bs[-1]-bs[0]
nbins=int(bs.size*math.sqrt(bs.size)*asz/bsz)
bbs = np.concatenate([[0],((nbins-1)*(bs-mn)/asz).astype(int).clip(0,nbins),[nbins]])
bins = np.repeat(np.arange(bs.size+1), np.diff(bbs))
bbs = bbs[:bbs.searchsorted(nbins)]
bins[bbs] = -1
aidx = bins[((nbins-1)*(a-mn)/asz).astype(int)]
ambig = aidx == -1
aa = a[ambig]
if aa.size:
aidx[ambig] = np.digitize(aa,bs,right)
return aidx
def f_pp():
bo = b.argsort()
bs = b[bo]
aidx = h_digitize(a,bs,right=True).ravel()
aux = sparse.csr_matrix((aidx,aidx,np.arange(aidx.size+1)),
(aidx.size,b.size+1)).tocsc()
ridx = np.empty(b.size,int)
ridx[bo] = aux.indices[np.fromiter(map(np.random.randint,aux.indptr[1:-1].tolist()),int,b.size)]
return np.unravel_index(ridx,a.shape)
def f_mp():
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, sorter=a_ind, side='right')
return np.unravel_index(a_ind[[np.random.randint(i) for i in indices]], a.shape)
a = np.random.rand(267, 173) # dense img matrix
b = np.random.rand(199) # array of probability samples
# round to test wether equality is handled correctly
a = np.round(a,3)
b = np.round(b,3)
print('pp',timeit(f_pp, number=1000),'ms')
print('mp',timeit(f_mp, number=1000),'ms')
# sanity checks
S = np.max([a[f_pp()] for _ in range(1000)],axis=0)
T = np.max([a[f_mp()] for _ in range(1000)],axis=0)
print(f"inequality satisfied: pp {(S<=b).all()} mp {(T<=b).all()}")
print(f"largest smalles distance to boundary: pp {(b-S).max()} mp {(b-T).max()}")
print(f"equality done right: pp {not (b-S).all()} mp {not (b-T).all()}")
使用经过调整的 digitize
我会快一点,但这可能会随着问题的大小而变化。此外,@MadPhysicist 的解决方案也不那么复杂。标准 digitize
我们差不多。
pp 2.620121960993856 ms
mp 3.301037881989032 ms
inequality satisfied: pp True mp True
largest smalles distance to boundary: pp 0.0040000000000000036 mp 0.006000000000000005
equality done right: pp True mp True
@MadPhysicist 的算法略有改进,使其更加矢量化:
%%timeit
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, sorter=a_ind)
r, c = np.unravel_index(a_ind[[np.random.randint(i) for i in indices]], a.shape)
100 loops, best of 3: 6.32 ms per loop
%%timeit
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, sorter=a_ind)
r, c = np.unravel_index(a_ind[(np.random.rand(indices.size) * indices).astype(int)], a.shape)
100 loops, best of 3: 4.16 ms per loop
@PaulPanzer 的解决方案仍然占据主导地位,尽管我不确定它在缓存什么:
%timeit f_pp()
The slowest run took 14.79 times longer than the fastest. This could mean that an intermediate result is being cached.
100 loops, best of 3: 1.88 ms per loop