更快的代码来计算具有循环(周期性)边界条件的 numpy 数组中点之间的距离
Faster code to calculate distance between points in numpy array with cyclic (periodic) boundary conditions
我知道如何计算数组中各点之间的欧氏距离
scipy.spatial.distance.cdist
类似于这个问题的答案:
但是,我想在假设循环边界条件的情况下进行计算,例如因此在这种情况下,点 [0,0] 与点 [0,n-1] 的距离为 1,而不是 n-1 的距离。 (然后我将为目标细胞阈值距离内的所有点制作一个掩码,但这不是问题的核心)。
我能想到的唯一方法就是重复计算9次,域索引在x,y和x&y方向上有n added/subtracted,然后堆叠结果并找到最小值跨越 9 个切片。为了说明重复 9 次的必要性,我整理了一个简单的示意图,其中只有 1 个 J 点,用圆圈标记,它显示了一个示例,在这种情况下,三角形标记的单元格在域中有其最近的邻居反映到左上角。
这是我使用 cdist 为此开发的代码:
import numpy as np
from scipy import spatial
n=5 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
i=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
j=np.argwhere(a>0.85) # set of J locations to find distance to.
# this will be used in the KDtree soln
global maxdist
maxdist=2.0
def dist_v1(i,j):
dist=[]
# 3x3 search required for periodic boundaries.
for xoff in [-n,0,n]:
for yoff in [-n,0,n]:
jo=j.copy()
jo[:,0]-=xoff
jo[:,1]-=yoff
dist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1))
dist=np.amin(np.stack(dist),0).reshape([n,n])
return(dist)
这有效,并产生例如:
print(dist_v1(i,j))
[[1.41421356 1. 1.41421356 1.41421356 1. ]
[2.23606798 2. 1.41421356 1. 1.41421356]
[2. 2. 1. 0. 1. ]
[1.41421356 1. 1.41421356 1. 1. ]
[1. 0. 1. 1. 0. ]]
零显然标记了 J 点,并且距离是正确的(此 EDIT 纠正了我之前不正确的尝试)。
请注意,如果您更改最后两行以堆叠原始距离,然后仅使用一个最小值,如下所示:
def dist_v2(i,j):
dist=[]
# 3x3 search required for periodic boundaries.
for xoff in [-n,0,n]:
for yoff in [-n,0,n]:
jo=j.copy()
jo[:,0]-=xoff
jo[:,1]-=yoff
dist.append(spatial.distance.cdist(i,jo,metric='euclidean'))
dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n])
return(dist)
对于较小的 n (<10) 速度更快,但对于较大的数组 (n>10)
慢得多
...但无论哪种方式,对于我的大型阵列(N=500 和 J 点数量大约为 70),它都是 慢 ,此搜索占用了大约 99%计算时间,(使用循环也有点难看)- 有 better/faster 方法吗?
我想到的其他选项是:
- scipy.spatial.KDTree.query_ball_point
进一步搜索我发现有一个函数
scipy.spatial.KDTree.query_ball_point 直接计算我的 J 点半径内的坐标,但它似乎没有任何使用周期性边界的工具,所以我认为仍然需要以某种方式使用 3x3 循环、堆栈和然后像我上面那样使用amin,所以我不确定这是否会更快。
我使用这个函数编写了一个解决方案,没有担心周期性边界条件(即这没有回答我的问题)
def dist_v3(n,j):
x, y = np.mgrid[0:n, 0:n]
points = np.c_[x.ravel(), y.ravel()]
tree=spatial.KDTree(points)
mask=np.zeros([n,n])
for results in tree.query_ball_point((j), maxdist):
mask[points[results][:,0],points[results][:,1]]=1
return(mask)
也许我没有以最有效的方式使用它,但这已经和我基于 cdist 的解决方案一样慢,即使没有周期性边界。在两个 cdist 解决方案中包括掩码函数,即将这些函数中的 return(dist)
替换为 return(np.where(dist<=maxdist,1,0))
,然后使用 timeit,我得到以下 n=100 的计时:
from timeit import timeit
print("cdist v1:",timeit(lambda: dist_v1(i,j), number=3)*100)
print("cdist v2:",timeit(lambda: dist_v2(i,j), number=3)*100)
print("KDtree:", timeit(lambda: dist_v3(n,j), number=3)*100)
cdist v1: 181.80927299981704
cdist v2: 554.8205785999016
KDtree: 605.119637199823
为设置距离 [0,0] 内的点创建一个相对坐标数组,然后手动遍历 J 点,使用此相对点列表设置掩码 - 这具有优点是“相对距离”计算只执行一次(我的 J 点在每个时间步都改变),但我怀疑循环会很慢。
为 2D 域中的每个点预先计算一组掩码,因此在模型集成的每个时间步长中,我只需为 J 点选择掩码并应用。这将使用大量内存(与 n^4 成正比)并且可能仍然很慢,因为您需要遍历 J 个点来组合掩码。
这是您的代码的固定版本和速度更快的不同方法。他们给出了相同的结果,所以我有理由相信他们是正确的:
import numpy as np
from scipy.spatial.distance import squareform, pdist, cdist
from numpy.linalg import norm
def pb_OP(A, p=1.0):
distl = []
for *offs, ct in [(0, 0, 0), (0, p, 1), (p, 0, 1), (p, p, 1), (-p, p, 1)]:
B = A - offs
distl.append(cdist(B, A, metric='euclidean'))
if ct:
distl.append(distl[-1].T)
return np.amin(np.dstack(distl), 2)
def pb_pp(A, p=1.0):
out = np.empty((2, A.shape[0]*(A.shape[0]-1)//2))
for o, i in zip(out, A.T):
pdist(i[:, None], 'cityblock', out=o)
out[out > p/2] -= p
return squareform(norm(out, axis=0))
test = np.random.random((1000, 2))
assert np.allclose(pb_OP(test), pb_pp(test))
from timeit import timeit
t_OP = timeit(lambda: pb_OP(test), number=10)*100
t_pp = timeit(lambda: pb_pp(test), number=10)*100
print('OP', t_OP)
print('pp', t_pp)
样本运行。 1000 点:
OP 210.11001259903423
pp 22.288734700123314
我们看到我的方法快了约 9 倍,巧合的是 OP 版本必须检查的偏移量配置数量。它在各个坐标上使用 pdist
来获得绝对差异。如果这些大于网格间距的一半,我们减去一个周期。它仍然采用欧几里德范数并解压缩存储。
这些是我计时的 8 种不同的解决方案,一些是我自己的,一些是针对我的问题发布的,它们使用 4 种广泛的方法:
- 空间 cdist
- 空间KDtree
- Sklearn BallTree
- 内核方法
这是包含 8 个测试例程的代码:
import numpy as np
from scipy import spatial
from sklearn.neighbors import BallTree
n=500 # size of 2D box
f=200./(n*n) # first number is rough number of target cells...
np.random.seed(1) # to make reproducable
a=np.random.uniform(size=(n,n))
i=np.argwhere(a>-1) # all points, we want to know distance to nearest point
j=np.argwhere(a>1.0-f) # set of locations to find distance to.
# long array of 3x3 j points:
for xoff in [0,n,-n]:
for yoff in [0,-n,n]:
if xoff==0 and yoff==0:
j9=j.copy()
else:
jo=j.copy()
jo[:,0]+=xoff
jo[:,1]+=yoff
j9=np.vstack((j9,jo))
global maxdist
maxdist=10
overlap=5.2
kernel_size=int(np.sqrt(overlap*n**2/j.shape[0])/2)
print("no points",len(j))
# repear cdist over each member of 3x3 block
def dist_v1(i,j):
dist=[]
# 3x3 search required for periodic boundaries.
for xoff in [-n,0,n]:
for yoff in [-n,0,n]:
jo=j.copy()
jo[:,0]+=xoff
jo[:,1]+=yoff
dist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1))
dist=np.amin(np.stack(dist),0).reshape([n,n])
#dmask=np.where(dist<=maxdist,1,0)
return(dist)
# same as v1, but taking one amin function at the end
def dist_v2(i,j):
dist=[]
# 3x3 search required for periodic boundaries.
for xoff in [-n,0,n]:
for yoff in [-n,0,n]:
jo=j.copy()
jo[:,0]+=xoff
jo[:,1]+=yoff
dist.append(spatial.distance.cdist(i,jo,metric='euclidean'))
dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n])
#dmask=np.where(dist<=maxdist,1,0)
return(dist)
# using a KDTree query ball points, looping over j9 points as in online example
def dist_v3(n,j):
x,y=np.mgrid[0:n,0:n]
points=np.c_[x.ravel(), y.ravel()]
tree=spatial.KDTree(points)
mask=np.zeros([n,n])
for results in tree.query_ball_point((j), 2.1):
mask[points[results][:,0],points[results][:,1]]=1
return(mask)
# using ckdtree query on the j9 long array
def dist_v4(i,j):
tree=spatial.cKDTree(j)
dist,minid=tree.query(i)
return(dist.reshape([n,n]))
# back to using Cdist, but on the long j9 3x3 array, rather than on each element separately
def dist_v5(i,j):
# 3x3 search required for periodic boundaries.
dist=np.amin(spatial.distance.cdist(i,j,metric='euclidean'),1)
#dmask=np.where(dist<=maxdist,1,0)
return(dist)
def dist_v6(i,j):
tree = BallTree(j,leaf_size=5,metric='euclidean')
dist = tree.query(i, k=1, return_distance=True)
mindist = dist[0].reshape(n,n)
return(mindist)
def sq_distance(x1, y1, x2, y2, n):
# computes the pairwise squared distance between 2 sets of points (with periodicity)
# x1, y1 : coordinates of the first set of points (source)
# x2, y2 : same
dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2)
dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2)
d = (dx*dx + dy*dy)
return d
def apply_kernel1(sources, sqdist, kern_size, n, mask):
ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing="ij")
kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
mask_kernel = kernel > kern_size**2
for pi, pj in sources:
ind_i = (pi+ker_i)%n
ind_j = (pj+ker_j)%n
sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
mask[ind_i,ind_j] *= mask_kernel
def apply_kernel2(sources, sqdist, kern_size, n, mask):
ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1))
ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1))
kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
mask_kernel = kernel > kern_size**2
for pi, pj in sources:
imin = pi-kern_size
jmin = pj-kern_size
imax = pi+kern_size+1
jmax = pj+kern_size+1
if imax < n and jmax < n and imin >=0 and jmin >=0: # we are inside
sqdist[imin:imax,jmin:jmax] = np.minimum(kernel, sqdist[imin:imax,jmin:jmax])
mask[imin:imax,jmin:jmax] *= mask_kernel
elif imax < n and imin >=0:
ind_j = (pj+ker_j.ravel())%n
sqdist[imin:imax,ind_j] = np.minimum(kernel, sqdist[imin:imax,ind_j])
mask[imin:imax,ind_j] *= mask_kernel
elif jmax < n and jmin >=0:
ind_i = (pi+ker_i.ravel())%n
sqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax])
mask[ind_i,jmin:jmax] *= mask_kernel
else :
ind_i = (pi+ker_i)%n
ind_j = (pj+ker_j)%n
sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
mask[ind_i,ind_j] *= mask_kernel
def dist_v7(sources, n, kernel_size,method):
sources = np.asfortranarray(sources) #for memory contiguity
kernel_size = min(kernel_size, n//2)
kernel_size = max(kernel_size, 1)
sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a huge distance (>max**2)
mask = np.ones((n,n), dtype=bool) #which points have not been reached?
#main code
if (method==1):
apply_kernel1(sources, sqdist, kernel_size, n, mask)
else:
apply_kernel2(sources, sqdist, kernel_size, n, mask)
#remaining points
rem_i, rem_j = np.nonzero(mask)
if len(rem_i) > 0:
sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0)
sqdist[rem_i, rem_j] = sq_d
return np.sqrt(sqdist)
from timeit import timeit
nl=10
print ("-----------------------")
print ("Timings for ",nl,"loops")
print ("-----------------------")
print("1. cdist looped amin:",timeit(lambda: dist_v1(i,j),number=nl))
print("2. cdist single amin:",timeit(lambda: dist_v2(i,j),number=nl))
print("3. KDtree ball pt:", timeit(lambda: dist_v3(n,j9),number=nl))
print("4. KDtree query:",timeit(lambda: dist_v4(i,j9),number=nl))
print("5. cdist long list:",timeit(lambda: dist_v5(i,j9),number=nl))
print("6. ball tree:",timeit(lambda: dist_v6(i,j9),number=nl))
print("7. kernel orig:", timeit(lambda: dist_v7(j, n, kernel_size,1), number=nl))
print("8. kernel optimised:", timeit(lambda: dist_v7(j, n, kernel_size,2), number=nl))
我的 linux 12 核台式机(48GB RAM)上 n=350 和 63 点的输出(以秒为单位):
no points 63
-----------------------
Timings for 10 loops
-----------------------
1. cdist looped amin: 3.2488364999881014
2. cdist single amin: 6.494611179979984
3. KDtree ball pt: 5.180531410995172
4. KDtree query: 0.9377906009904109
5. cdist long list: 3.906166430999292
6. ball tree: 3.3540162370190956
7. kernel orig: 0.7813036740117241
8. kernel optimised: 0.17046571199898608
并且对于 n=500 和 npts=176:
no points 176
-----------------------
Timings for 10 loops
-----------------------
1. cdist looped amin: 16.787221198988846
2. cdist single amin: 40.97849371898337
3. KDtree ball pt: 9.926229109987617
4. KDtree query: 0.8417396580043714
5. cdist long list: 14.345821461000014
6. ball tree: 1.8792325239919592
7. kernel orig: 1.0807358759921044
8. kernel optimised: 0.5650744160229806
所以综上所述我得出了以下结论:
- 如果你有相当大的问题,请避免使用 cdist
- 如果您的问题不是太 computational-time 受限,我会推荐“KDtree 查询”方法,因为它只有 2 行没有周期性边界,还有几行有周期性边界来设置 j9 数组
- 为了获得最佳性能(例如,模型的长期集成,在我的情况下,每个时间步都需要这样做),那么 Kernal 解决方案现在是迄今为止最快的。
对于计算多个距离,我认为很难击败简单的 BallTree(或类似的)。
我不太明白循环边界,或者至少为什么你需要循环 3x3 次,正如我所看到的那样,它的行为类似于圆环,足以制作 5 份副本。
更新:确实你需要 3x3 的边缘。我更新了代码。
为了确保我的 minimum_distance
是正确的,为 n = 200
做了一个 np.all( minimum_distance == dist_v1(i,j) )
测试 True
.
对于使用提供的代码生成的 n = 500
,冷启动的 %%time
给出了
CPU times: user 1.12 s, sys: 0 ns, total: 1.12 s
Wall time: 1.11 s
所以我生成了 500 个数据点,就像 post
import numpy as np
n=500 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
i=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
j=np.argwhere(a>0.85) # set of J locations to find distance to.
并使用 BallTree
import numpy as np
from sklearn.neighbors import BallTree
N = j.copy()
NE = j.copy()
E = j.copy()
SE = j.copy()
S = j.copy()
SW = j.copy()
W = j.copy()
NW = j.copy()
N[:,1] = N[:,1] - n
NE[:,0] = NE[:,0] - n
NE[:,1] = NE[:,1] - n
E[:,0] = E[:,0] - n
SE[:,0] = SE[:,0] - n
SE[:,1] = SE[:,1] + n
S[:,1] = S[:,1] + n
SW[:,0] = SW[:,0] + n
SW[:,1] = SW[:,1] + n
W[:,0] = W[:,0] + n
NW[:,0] = NW[:,0] + n
NW[:,1] = NW[:,1] - n
tree = BallTree(np.concatenate([j,N,E,S,W,NE,SE,SW,NW]), leaf_size=15, metric='euclidean')
dist = tree.query(i, k=1, return_distance=True)
minimum_distance = dist[0].reshape(n,n)
更新:
注意这里我把数据复制到盒子的N,E,S,W,NE,SE,NW,SE,来处理边界条件。同样,对于 n = 200
这给出了相同的结果。你可以调整leaf_size
,但我觉得这个设置没问题。
性能对j
中的点数敏感。
[编辑] - 我发现代码跟踪工作完成点的方式存在错误,用 mask_kernel
修复了它。较新代码的纯 python 版本慢了约 1.5 倍,但 numba 版本稍快(由于其他一些优化)。
[当前最佳:原始速度的 100 倍到 120 倍]
首先感谢您提交这个问题,我在优化过程中得到了很多乐趣!
我目前的最佳解决方案依赖于网格是规则的并且“源”点(我们需要从中计算距离的点)大致均匀分布的假设。
这里的想法是所有的距离都将是 1,sqrt(2)
,sqrt(3)
,...所以我们可以预先进行数值计算。然后我们简单地将这些值放在一个矩阵中,并在每个源点周围复制该矩阵(并确保保留在每个点找到的最小值)。这涵盖了绝大多数点 (>99%)。然后我们对剩下的1%应用另一种更“经典”的方法。
代码如下:
import numpy as np
def sq_distance(x1, y1, x2, y2, n):
# computes the pairwise squared distance between 2 sets of points (with periodicity)
# x1, y1 : coordinates of the first set of points (source)
# x2, y2 : same
dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2)
dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2)
d = (dx*dx + dy*dy)
return d
def apply_kernel(sources, sqdist, kern_size, n, mask):
ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing="ij")
kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
mask_kernel = kernel > kern_size**2
for pi, pj in sources:
ind_i = (pi+ker_i)%n
ind_j = (pj+ker_j)%n
sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
mask[ind_i,ind_j] *= mask_kernel
def dist_vf(sources, n, kernel_size):
sources = np.asfortranarray(sources) #for memory contiguity
kernel_size = min(kernel_size, n//2)
kernel_size = max(kernel_size, 1)
sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a huge distance (>max**2)
mask = np.ones((n,n), dtype=bool) #which points have not been reached?
#main code
apply_kernel(sources, sqdist, kernel_size, n, mask)
#remaining points
rem_i, rem_j = np.nonzero(mask)
if len(rem_i) > 0:
sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0)
sqdist[rem_i, rem_j] = sq_d
#eff = 1-rem_i.size/n**2
#print("covered by kernel :", 100*eff, "%")
#print("overlap :", sources.shape[0]*(1+2*kernel_size)**2/n**2)
#print()
return np.sqrt(sqdist)
正在使用
测试此版本
n=500 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
all_points=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
source_points=np.argwhere(a>1-70/n**2) # set of J locations to find distance to.
#
# code for dist_v1 and dist_vf
#
overlap=5.2
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)
print("cdist v1 :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel version:", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")
给予
cdist v1 : 1148.6694 ms
kernel version: 69.21876999999998 ms
这已经是 ~17 倍的加速了!我还实现了 sq_distance
和 apply_kernel
的 numba 版本:[这是新的正确版本]
@njit(cache=True)
def sq_distance(x1, y1, x2, y2, n):
m1 = x1.size
m2 = x2.size
n2 = n//2
d = np.empty((m1,m2), dtype=np.int32)
for i in range(m1):
for j in range(m2):
dx = np.abs(x1[i] - x2[j] + n2)%n - n2
dy = np.abs(y1[i] - y2[j] + n2)%n - n2
d[i,j] = (dx*dx + dy*dy)
return d
@njit(cache=True)
def apply_kernel(sources, sqdist, kern_size, n, mask):
# creating the kernel
kernel = np.empty((2*kern_size+1, 2*kern_size+1))
vals = np.arange(-kern_size, kern_size+1)**2
for i in range(2*kern_size+1):
for j in range(2*kern_size+1):
kernel[i,j] = vals[i] + vals[j]
mask_kernel = kernel > kern_size**2
I = sources[:,0]
J = sources[:,1]
# applying the kernel for each point
for l in range(sources.shape[0]):
pi = I[l]
pj = J[l]
if pj - kern_size >= 0 and pj + kern_size<n: #if we are in the middle, no need to do the modulo for j
for i in range(2*kern_size+1):
ind_i = np.mod((pi+i-kern_size), n)
for j in range(2*kern_size+1):
ind_j = (pj+j-kern_size)
sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]
else:
for i in range(2*kern_size+1):
ind_i = np.mod((pi+i-kern_size), n)
for j in range(2*kern_size+1):
ind_j = np.mod((pj+j-kern_size), n)
sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]
return
并使用
进行测试
overlap=5.2
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)
print("cdist v1 :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel numba (first run):", timeit(lambda: dist_vf(source_points, n, kernel_size), number=1)*1000, "ms") #first run = cimpilation = long
print("kernel numba :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")
结果如下
cdist v1 : 1163.0742 ms
kernel numba (first run): 2060.0802 ms
kernel numba : 8.80377000000001 ms
由于 JIT 编译,第一个 运行 相当慢,但除此之外,改进了 120 倍!
可以通过调整 kernel_size
参数(或 overlap
)从该算法中获得更多信息。当前选择kernel_size
只对少量源点有效。例如,此选择因 source_points=np.argwhere(a>0.85)
(13 秒)而惨败,而手动设置 kernel_size=5
在 22 毫秒内给出答案。
我希望我的 post 不会(不必要地)太复杂,我真的不知道如何更好地组织它。
[编辑 2]:
我对代码的 non-numba 部分给予了更多关注,并设法获得了相当显着的加速,非常接近 numba 可以实现的目标:这是函数的新版本 apply_kernel
:
def apply_kernel(sources, sqdist, kern_size, n, mask):
ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1))
ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1))
kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
mask_kernel = kernel > kern_size**2
for pi, pj in sources:
imin = pi-kern_size
jmin = pj-kern_size
imax = pi+kern_size+1
jmax = pj+kern_size+1
if imax < n and jmax < n and imin >=0 and jmin >=0: # we are inside
sqdist[imin:imax,jmin:jmax] = np.minimum(kernel, sqdist[imin:imax,jmin:jmax])
mask[imin:imax,jmin:jmax] *= mask_kernel
elif imax < n and imin >=0:
ind_j = (pj+ker_j.ravel())%n
sqdist[imin:imax,ind_j] = np.minimum(kernel, sqdist[imin:imax,ind_j])
mask[imin:imax,ind_j] *= mask_kernel
elif jmax < n and jmin >=0:
ind_i = (pi+ker_i.ravel())%n
sqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax])
mask[ind_i,jmin:jmax] *= mask_kernel
else :
ind_i = (pi+ker_i)%n
ind_j = (pj+ker_j)%n
sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
mask[ind_i,ind_j] *= mask_kernel
主要的优化是
- 使用切片索引(而不是密集数组)
- 稀疏索引的使用(我之前怎么没想到)
测试
overlap=5.4
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)
print("cdist v1 :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel v2 :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")
给予
cdist v1 : 1209.8163000000002 ms
kernel v2 : 11.319049999999997 ms
这是对 cdist 的 100 倍改进,比以前的 numpy-only 版本改进了 ~5.5 倍,只比我用 numba 实现的速度慢了 ~25%。
我将从图像处理的角度展示另一种方法,您可能会感兴趣,无论它是否最快。为了方便起见,我只实现了一个奇数 n
.
与其考虑一组 nxn
点 i
,不如考虑 nxn
框。我们可以将其视为二值图像。让 j
中的每个点都成为该图像中的正像素。对于 n=5
,这看起来像:
现在让我们考虑图像处理中的另一个概念:膨胀。对于任何输入像素,如果它的 neighborhood
中有一个正像素,则输出像素将为 1。这个邻域由所谓的 Structuring Element
定义:一个布尔核,其中那些将显示哪个要考虑的邻居。
以下是我为这个问题定义 SE 的方式:
Y, X = np.ogrid[-n:n+1, -n:n+1]
SQ = X*X+Y*Y
H = SQ == r
直觉上,H 是一个掩码,表示“满足方程 x*x+y*y=r
的中心点的所有点。也就是说,H 中的所有点都位于距中心 sqrt(r)
处。另一个可视化,它会非常清楚:
这是一个不断扩大的像素圈。每个蒙版中的每个白色像素表示与中心像素的距离恰好为 sqrt(r)
的点。您可能还可以看出,如果我们迭代地增加 r
的值,我们实际上会稳定地覆盖特定位置周围的所有像素位置,最终覆盖整个图像。 (r 的一些值没有给出响应,因为对于任何一对点都不存在这样的距离 sqrt(r)。我们跳过那些 r 值——比如 3。)
这就是主要算法的作用。
- 我们将从 0 开始逐渐增加
r
的值到某个上限。
- 在每一步中,如果图像中的任何位置 (x,y) 给出了对膨胀的响应,则意味着在距离它恰好 sqrt(r) 距离处有一个 j 点!
- 我们可以多次找到匹配;我们将只保留第一场比赛并放弃更多比赛以获得积分。我们这样做直到所有像素 (x,y) 都找到它们的最小距离/第一个匹配项。
所以你可以说这个算法依赖于 nxn 图像中唯一距离对的数量。
这也意味着如果你在 j 中的点越来越多,算法实际上会变得更快,这违反常识!
这个扩张算法的最坏情况是当你有最少数量的点(正好是 j 中的一个点)时,因为那时它需要将 r 迭代到一个非常高的值才能从很远的点获得匹配.
在实施方面:
n=5 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
I=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
J=np.argwhere(a>0.85)
Y, X = np.ogrid[-n:n+1, -n:n+1]
SQ = X*X+Y*Y
point_space = np.zeros((n, n))
point_space[J[:,0], J[:,1]] = 1
C1 = point_space[:, :n//2]
C2 = point_space[:, n//2+1:]
C = np.hstack([C2, point_space, C1])
D1 = point_space[:n//2, :]
D2 = point_space[n//2+1:, :]
D2_ = np.hstack([point_space[n//2+1:, n//2+1:],D2,point_space[n//2+1:, :n//2]])
D1_ = np.hstack([point_space[:n//2:, n//2+1:],D1,point_space[:n//2, :n//2]])
D = np.vstack([D2_, C, D1_])
p = (3*n-len(D))//2
D = np.pad(D, (p,p), constant_values=(0,0))
plt.imshow(D, cmap='gray')
plt.title(f'n={n}')
如果你看一下 n=5 的图像,你就会知道我做了什么;我只是简单地用四个象限填充图像以表示循环 space,然后添加一些额外的零填充以解决最坏情况下的搜索边界。
@nb.jit
def dilation(image, output, kernel, N, i0, i1):
for i in range(i0,i1):
for j in range(i0, i1):
a_0 = i-(N//2)
a_1 = a_0+N
b_0 = j-(N//2)
b_1 = b_0+N
neighborhood = image[a_0:a_1, b_0:b_1]*kernel
if np.any(neighborhood):
output[i-i0,j-i0] = 1
return output
@nb.njit(cache=True)
def progressive_dilation(point_space, out, total, dist, SQ, n, N_):
for i in range(N_):
if not np.any(total):
break
H = SQ == i
rows, cols = np.nonzero(H)
if len(rows) == 0: continue
rmin, rmax = rows.min(), rows.max()
cmin, cmax = cols.min(), cols.max()
H_ = H[rmin:rmax+1, cmin:cmax+1]
out[:] = False
out = dilation(point_space, out, H_, len(H_), n, 2*n)
idx = np.logical_and(out, total)
for a, b in zip(*np.where(idx)):
dist[a, b] = i
total = total * np.logical_not(out)
return dist
def dilateWrap(D, SQ, n):
out = np.zeros((n,n), dtype=bool)
total = np.ones((n,n), dtype=bool)
dist=-1*np.ones((n,n))
dist = progressive_dilation(D, out, total, dist, SQ, n, 2*n*n+1)
return dist
dout = dilateWrap(D, SQ, n)
如果我们将 dout 可视化,我们实际上可以获得距离的绝佳视觉表示。
暗点基本上是存在 j 个点的位置。最亮点自然意味着离任何 j 最远的点。请注意,我将值保留为平方形式以获得整数图像。实际距离仍然相差一个平方根。结果与球场算法的输出相匹配。
# after resetting n = 501 and rerunning the first block
N = J.copy()
NE = J.copy()
E = J.copy()
SE = J.copy()
S = J.copy()
SW = J.copy()
W = J.copy()
NW = J.copy()
N[:,1] = N[:,1] - n
NE[:,0] = NE[:,0] - n
NE[:,1] = NE[:,1] - n
E[:,0] = E[:,0] - n
SE[:,0] = SE[:,0] - n
SE[:,1] = SE[:,1] + n
S[:,1] = S[:,1] + n
SW[:,0] = SW[:,0] + n
SW[:,1] = SW[:,1] + n
W[:,0] = W[:,0] + n
NW[:,0] = NW[:,0] + n
NW[:,1] = NW[:,1] - n
def distBP(I,J):
tree = BallTree(np.concatenate([J,N,E,S,W,NE,SE,SW,NW]), leaf_size=15, metric='euclidean')
dist = tree.query(I, k=1, return_distance=True)
minimum_distance = dist[0].reshape(n,n)
return minimum_distance
print(np.array_equal(distBP(I,J), np.sqrt(dilateWrap(D, SQ, n))))
输出:
True
现在检查时间,在 n=501。
from timeit import timeit
nl=1
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: dilateWrap(D, SQ, n),number=nl))
输出:
ball tree: 1.1706031339999754
dilation: 1.086665302000256
我会说它们大致相等,尽管膨胀有非常小的优势。其实dilation还是少了一个平方根运算,补上吧
from timeit import timeit
nl=1
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: np.sqrt(dilateWrap(D, SQ, n)),number=nl))
输出:
ball tree: 1.1712950239998463
dilation: 1.092416919000243
平方根对时间的影响基本可以忽略
现在,我之前说过,当 j 中实际上有更多点时,膨胀会变得更快。所以让我们增加j中的点数。
n=501 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
I=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
J=np.argwhere(a>0.4) # previously a>0.85
现在查看时间:
from timeit import timeit
nl=1
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: np.sqrt(dilateWrap(D, SQ, n)),number=nl))
输出:
ball tree: 3.3354218500007846
dilation: 0.2178608220001479
球树实际上变慢了,而膨胀变快了!这是因为如果有很多 j 个点,我们可以通过几次重复的扩张快速找到所有距离。我觉得这个效果很有趣——通常你会认为运行时间会随着点数的增加而变差,但这里恰恰相反。
相反,如果我们减少 j,我们会看到膨胀变慢:
#Setting a>0.9
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: np.sqrt(dilateWrap(D, SQ, n)),number=nl))
输出:
ball tree: 1.010353464000218
dilation: 1.4776274510004441
我认为我们可以安全地得出结论,基于卷积或内核的方法在这个特定问题上会提供更好的收益,而不是基于对、点或树的方法。
最后,开头已经提到了,我再说一遍:整个实现只考虑n的奇数;我没有耐心为偶数 n 计算适当的填充。 (如果你熟悉图像处理,你可能以前遇到过这个问题:有了 odd 一切都变得更容易了大小。)
这还可以进一步优化,因为我只是偶尔涉足 numba。
我知道如何计算数组中各点之间的欧氏距离 scipy.spatial.distance.cdist
类似于这个问题的答案:
但是,我想在假设循环边界条件的情况下进行计算,例如因此在这种情况下,点 [0,0] 与点 [0,n-1] 的距离为 1,而不是 n-1 的距离。 (然后我将为目标细胞阈值距离内的所有点制作一个掩码,但这不是问题的核心)。
我能想到的唯一方法就是重复计算9次,域索引在x,y和x&y方向上有n added/subtracted,然后堆叠结果并找到最小值跨越 9 个切片。为了说明重复 9 次的必要性,我整理了一个简单的示意图,其中只有 1 个 J 点,用圆圈标记,它显示了一个示例,在这种情况下,三角形标记的单元格在域中有其最近的邻居反映到左上角。
这是我使用 cdist 为此开发的代码:
import numpy as np
from scipy import spatial
n=5 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
i=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
j=np.argwhere(a>0.85) # set of J locations to find distance to.
# this will be used in the KDtree soln
global maxdist
maxdist=2.0
def dist_v1(i,j):
dist=[]
# 3x3 search required for periodic boundaries.
for xoff in [-n,0,n]:
for yoff in [-n,0,n]:
jo=j.copy()
jo[:,0]-=xoff
jo[:,1]-=yoff
dist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1))
dist=np.amin(np.stack(dist),0).reshape([n,n])
return(dist)
这有效,并产生例如:
print(dist_v1(i,j))
[[1.41421356 1. 1.41421356 1.41421356 1. ]
[2.23606798 2. 1.41421356 1. 1.41421356]
[2. 2. 1. 0. 1. ]
[1.41421356 1. 1.41421356 1. 1. ]
[1. 0. 1. 1. 0. ]]
零显然标记了 J 点,并且距离是正确的(此 EDIT 纠正了我之前不正确的尝试)。
请注意,如果您更改最后两行以堆叠原始距离,然后仅使用一个最小值,如下所示:
def dist_v2(i,j):
dist=[]
# 3x3 search required for periodic boundaries.
for xoff in [-n,0,n]:
for yoff in [-n,0,n]:
jo=j.copy()
jo[:,0]-=xoff
jo[:,1]-=yoff
dist.append(spatial.distance.cdist(i,jo,metric='euclidean'))
dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n])
return(dist)
对于较小的 n (<10) 速度更快,但对于较大的数组 (n>10)
慢得多...但无论哪种方式,对于我的大型阵列(N=500 和 J 点数量大约为 70),它都是 慢 ,此搜索占用了大约 99%计算时间,(使用循环也有点难看)- 有 better/faster 方法吗?
我想到的其他选项是:
- scipy.spatial.KDTree.query_ball_point
进一步搜索我发现有一个函数 scipy.spatial.KDTree.query_ball_point 直接计算我的 J 点半径内的坐标,但它似乎没有任何使用周期性边界的工具,所以我认为仍然需要以某种方式使用 3x3 循环、堆栈和然后像我上面那样使用amin,所以我不确定这是否会更快。
我使用这个函数编写了一个解决方案,没有担心周期性边界条件(即这没有回答我的问题)
def dist_v3(n,j):
x, y = np.mgrid[0:n, 0:n]
points = np.c_[x.ravel(), y.ravel()]
tree=spatial.KDTree(points)
mask=np.zeros([n,n])
for results in tree.query_ball_point((j), maxdist):
mask[points[results][:,0],points[results][:,1]]=1
return(mask)
也许我没有以最有效的方式使用它,但这已经和我基于 cdist 的解决方案一样慢,即使没有周期性边界。在两个 cdist 解决方案中包括掩码函数,即将这些函数中的 return(dist)
替换为 return(np.where(dist<=maxdist,1,0))
,然后使用 timeit,我得到以下 n=100 的计时:
from timeit import timeit
print("cdist v1:",timeit(lambda: dist_v1(i,j), number=3)*100)
print("cdist v2:",timeit(lambda: dist_v2(i,j), number=3)*100)
print("KDtree:", timeit(lambda: dist_v3(n,j), number=3)*100)
cdist v1: 181.80927299981704
cdist v2: 554.8205785999016
KDtree: 605.119637199823
为设置距离 [0,0] 内的点创建一个相对坐标数组,然后手动遍历 J 点,使用此相对点列表设置掩码 - 这具有优点是“相对距离”计算只执行一次(我的 J 点在每个时间步都改变),但我怀疑循环会很慢。
为 2D 域中的每个点预先计算一组掩码,因此在模型集成的每个时间步长中,我只需为 J 点选择掩码并应用。这将使用大量内存(与 n^4 成正比)并且可能仍然很慢,因为您需要遍历 J 个点来组合掩码。
这是您的代码的固定版本和速度更快的不同方法。他们给出了相同的结果,所以我有理由相信他们是正确的:
import numpy as np
from scipy.spatial.distance import squareform, pdist, cdist
from numpy.linalg import norm
def pb_OP(A, p=1.0):
distl = []
for *offs, ct in [(0, 0, 0), (0, p, 1), (p, 0, 1), (p, p, 1), (-p, p, 1)]:
B = A - offs
distl.append(cdist(B, A, metric='euclidean'))
if ct:
distl.append(distl[-1].T)
return np.amin(np.dstack(distl), 2)
def pb_pp(A, p=1.0):
out = np.empty((2, A.shape[0]*(A.shape[0]-1)//2))
for o, i in zip(out, A.T):
pdist(i[:, None], 'cityblock', out=o)
out[out > p/2] -= p
return squareform(norm(out, axis=0))
test = np.random.random((1000, 2))
assert np.allclose(pb_OP(test), pb_pp(test))
from timeit import timeit
t_OP = timeit(lambda: pb_OP(test), number=10)*100
t_pp = timeit(lambda: pb_pp(test), number=10)*100
print('OP', t_OP)
print('pp', t_pp)
样本运行。 1000 点:
OP 210.11001259903423
pp 22.288734700123314
我们看到我的方法快了约 9 倍,巧合的是 OP 版本必须检查的偏移量配置数量。它在各个坐标上使用 pdist
来获得绝对差异。如果这些大于网格间距的一半,我们减去一个周期。它仍然采用欧几里德范数并解压缩存储。
这些是我计时的 8 种不同的解决方案,一些是我自己的,一些是针对我的问题发布的,它们使用 4 种广泛的方法:
- 空间 cdist
- 空间KDtree
- Sklearn BallTree
- 内核方法
这是包含 8 个测试例程的代码:
import numpy as np
from scipy import spatial
from sklearn.neighbors import BallTree
n=500 # size of 2D box
f=200./(n*n) # first number is rough number of target cells...
np.random.seed(1) # to make reproducable
a=np.random.uniform(size=(n,n))
i=np.argwhere(a>-1) # all points, we want to know distance to nearest point
j=np.argwhere(a>1.0-f) # set of locations to find distance to.
# long array of 3x3 j points:
for xoff in [0,n,-n]:
for yoff in [0,-n,n]:
if xoff==0 and yoff==0:
j9=j.copy()
else:
jo=j.copy()
jo[:,0]+=xoff
jo[:,1]+=yoff
j9=np.vstack((j9,jo))
global maxdist
maxdist=10
overlap=5.2
kernel_size=int(np.sqrt(overlap*n**2/j.shape[0])/2)
print("no points",len(j))
# repear cdist over each member of 3x3 block
def dist_v1(i,j):
dist=[]
# 3x3 search required for periodic boundaries.
for xoff in [-n,0,n]:
for yoff in [-n,0,n]:
jo=j.copy()
jo[:,0]+=xoff
jo[:,1]+=yoff
dist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1))
dist=np.amin(np.stack(dist),0).reshape([n,n])
#dmask=np.where(dist<=maxdist,1,0)
return(dist)
# same as v1, but taking one amin function at the end
def dist_v2(i,j):
dist=[]
# 3x3 search required for periodic boundaries.
for xoff in [-n,0,n]:
for yoff in [-n,0,n]:
jo=j.copy()
jo[:,0]+=xoff
jo[:,1]+=yoff
dist.append(spatial.distance.cdist(i,jo,metric='euclidean'))
dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n])
#dmask=np.where(dist<=maxdist,1,0)
return(dist)
# using a KDTree query ball points, looping over j9 points as in online example
def dist_v3(n,j):
x,y=np.mgrid[0:n,0:n]
points=np.c_[x.ravel(), y.ravel()]
tree=spatial.KDTree(points)
mask=np.zeros([n,n])
for results in tree.query_ball_point((j), 2.1):
mask[points[results][:,0],points[results][:,1]]=1
return(mask)
# using ckdtree query on the j9 long array
def dist_v4(i,j):
tree=spatial.cKDTree(j)
dist,minid=tree.query(i)
return(dist.reshape([n,n]))
# back to using Cdist, but on the long j9 3x3 array, rather than on each element separately
def dist_v5(i,j):
# 3x3 search required for periodic boundaries.
dist=np.amin(spatial.distance.cdist(i,j,metric='euclidean'),1)
#dmask=np.where(dist<=maxdist,1,0)
return(dist)
def dist_v6(i,j):
tree = BallTree(j,leaf_size=5,metric='euclidean')
dist = tree.query(i, k=1, return_distance=True)
mindist = dist[0].reshape(n,n)
return(mindist)
def sq_distance(x1, y1, x2, y2, n):
# computes the pairwise squared distance between 2 sets of points (with periodicity)
# x1, y1 : coordinates of the first set of points (source)
# x2, y2 : same
dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2)
dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2)
d = (dx*dx + dy*dy)
return d
def apply_kernel1(sources, sqdist, kern_size, n, mask):
ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing="ij")
kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
mask_kernel = kernel > kern_size**2
for pi, pj in sources:
ind_i = (pi+ker_i)%n
ind_j = (pj+ker_j)%n
sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
mask[ind_i,ind_j] *= mask_kernel
def apply_kernel2(sources, sqdist, kern_size, n, mask):
ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1))
ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1))
kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
mask_kernel = kernel > kern_size**2
for pi, pj in sources:
imin = pi-kern_size
jmin = pj-kern_size
imax = pi+kern_size+1
jmax = pj+kern_size+1
if imax < n and jmax < n and imin >=0 and jmin >=0: # we are inside
sqdist[imin:imax,jmin:jmax] = np.minimum(kernel, sqdist[imin:imax,jmin:jmax])
mask[imin:imax,jmin:jmax] *= mask_kernel
elif imax < n and imin >=0:
ind_j = (pj+ker_j.ravel())%n
sqdist[imin:imax,ind_j] = np.minimum(kernel, sqdist[imin:imax,ind_j])
mask[imin:imax,ind_j] *= mask_kernel
elif jmax < n and jmin >=0:
ind_i = (pi+ker_i.ravel())%n
sqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax])
mask[ind_i,jmin:jmax] *= mask_kernel
else :
ind_i = (pi+ker_i)%n
ind_j = (pj+ker_j)%n
sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
mask[ind_i,ind_j] *= mask_kernel
def dist_v7(sources, n, kernel_size,method):
sources = np.asfortranarray(sources) #for memory contiguity
kernel_size = min(kernel_size, n//2)
kernel_size = max(kernel_size, 1)
sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a huge distance (>max**2)
mask = np.ones((n,n), dtype=bool) #which points have not been reached?
#main code
if (method==1):
apply_kernel1(sources, sqdist, kernel_size, n, mask)
else:
apply_kernel2(sources, sqdist, kernel_size, n, mask)
#remaining points
rem_i, rem_j = np.nonzero(mask)
if len(rem_i) > 0:
sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0)
sqdist[rem_i, rem_j] = sq_d
return np.sqrt(sqdist)
from timeit import timeit
nl=10
print ("-----------------------")
print ("Timings for ",nl,"loops")
print ("-----------------------")
print("1. cdist looped amin:",timeit(lambda: dist_v1(i,j),number=nl))
print("2. cdist single amin:",timeit(lambda: dist_v2(i,j),number=nl))
print("3. KDtree ball pt:", timeit(lambda: dist_v3(n,j9),number=nl))
print("4. KDtree query:",timeit(lambda: dist_v4(i,j9),number=nl))
print("5. cdist long list:",timeit(lambda: dist_v5(i,j9),number=nl))
print("6. ball tree:",timeit(lambda: dist_v6(i,j9),number=nl))
print("7. kernel orig:", timeit(lambda: dist_v7(j, n, kernel_size,1), number=nl))
print("8. kernel optimised:", timeit(lambda: dist_v7(j, n, kernel_size,2), number=nl))
我的 linux 12 核台式机(48GB RAM)上 n=350 和 63 点的输出(以秒为单位):
no points 63
-----------------------
Timings for 10 loops
-----------------------
1. cdist looped amin: 3.2488364999881014
2. cdist single amin: 6.494611179979984
3. KDtree ball pt: 5.180531410995172
4. KDtree query: 0.9377906009904109
5. cdist long list: 3.906166430999292
6. ball tree: 3.3540162370190956
7. kernel orig: 0.7813036740117241
8. kernel optimised: 0.17046571199898608
并且对于 n=500 和 npts=176:
no points 176
-----------------------
Timings for 10 loops
-----------------------
1. cdist looped amin: 16.787221198988846
2. cdist single amin: 40.97849371898337
3. KDtree ball pt: 9.926229109987617
4. KDtree query: 0.8417396580043714
5. cdist long list: 14.345821461000014
6. ball tree: 1.8792325239919592
7. kernel orig: 1.0807358759921044
8. kernel optimised: 0.5650744160229806
所以综上所述我得出了以下结论:
- 如果你有相当大的问题,请避免使用 cdist
- 如果您的问题不是太 computational-time 受限,我会推荐“KDtree 查询”方法,因为它只有 2 行没有周期性边界,还有几行有周期性边界来设置 j9 数组
- 为了获得最佳性能(例如,模型的长期集成,在我的情况下,每个时间步都需要这样做),那么 Kernal 解决方案现在是迄今为止最快的。
对于计算多个距离,我认为很难击败简单的 BallTree(或类似的)。
我不太明白循环边界,或者至少为什么你需要循环 3x3 次,正如我所看到的那样,它的行为类似于圆环,足以制作 5 份副本。 更新:确实你需要 3x3 的边缘。我更新了代码。
为了确保我的 minimum_distance
是正确的,为 n = 200
做了一个 np.all( minimum_distance == dist_v1(i,j) )
测试 True
.
对于使用提供的代码生成的 n = 500
,冷启动的 %%time
给出了
CPU times: user 1.12 s, sys: 0 ns, total: 1.12 s
Wall time: 1.11 s
所以我生成了 500 个数据点,就像 post
import numpy as np
n=500 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
i=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
j=np.argwhere(a>0.85) # set of J locations to find distance to.
并使用 BallTree
import numpy as np
from sklearn.neighbors import BallTree
N = j.copy()
NE = j.copy()
E = j.copy()
SE = j.copy()
S = j.copy()
SW = j.copy()
W = j.copy()
NW = j.copy()
N[:,1] = N[:,1] - n
NE[:,0] = NE[:,0] - n
NE[:,1] = NE[:,1] - n
E[:,0] = E[:,0] - n
SE[:,0] = SE[:,0] - n
SE[:,1] = SE[:,1] + n
S[:,1] = S[:,1] + n
SW[:,0] = SW[:,0] + n
SW[:,1] = SW[:,1] + n
W[:,0] = W[:,0] + n
NW[:,0] = NW[:,0] + n
NW[:,1] = NW[:,1] - n
tree = BallTree(np.concatenate([j,N,E,S,W,NE,SE,SW,NW]), leaf_size=15, metric='euclidean')
dist = tree.query(i, k=1, return_distance=True)
minimum_distance = dist[0].reshape(n,n)
更新:
注意这里我把数据复制到盒子的N,E,S,W,NE,SE,NW,SE,来处理边界条件。同样,对于 n = 200
这给出了相同的结果。你可以调整leaf_size
,但我觉得这个设置没问题。
性能对j
中的点数敏感。
[编辑] - 我发现代码跟踪工作完成点的方式存在错误,用 mask_kernel
修复了它。较新代码的纯 python 版本慢了约 1.5 倍,但 numba 版本稍快(由于其他一些优化)。
[当前最佳:原始速度的 100 倍到 120 倍]
首先感谢您提交这个问题,我在优化过程中得到了很多乐趣!
我目前的最佳解决方案依赖于网格是规则的并且“源”点(我们需要从中计算距离的点)大致均匀分布的假设。
这里的想法是所有的距离都将是 1,sqrt(2)
,sqrt(3)
,...所以我们可以预先进行数值计算。然后我们简单地将这些值放在一个矩阵中,并在每个源点周围复制该矩阵(并确保保留在每个点找到的最小值)。这涵盖了绝大多数点 (>99%)。然后我们对剩下的1%应用另一种更“经典”的方法。
代码如下:
import numpy as np
def sq_distance(x1, y1, x2, y2, n):
# computes the pairwise squared distance between 2 sets of points (with periodicity)
# x1, y1 : coordinates of the first set of points (source)
# x2, y2 : same
dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2)
dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2)
d = (dx*dx + dy*dy)
return d
def apply_kernel(sources, sqdist, kern_size, n, mask):
ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing="ij")
kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
mask_kernel = kernel > kern_size**2
for pi, pj in sources:
ind_i = (pi+ker_i)%n
ind_j = (pj+ker_j)%n
sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
mask[ind_i,ind_j] *= mask_kernel
def dist_vf(sources, n, kernel_size):
sources = np.asfortranarray(sources) #for memory contiguity
kernel_size = min(kernel_size, n//2)
kernel_size = max(kernel_size, 1)
sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a huge distance (>max**2)
mask = np.ones((n,n), dtype=bool) #which points have not been reached?
#main code
apply_kernel(sources, sqdist, kernel_size, n, mask)
#remaining points
rem_i, rem_j = np.nonzero(mask)
if len(rem_i) > 0:
sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0)
sqdist[rem_i, rem_j] = sq_d
#eff = 1-rem_i.size/n**2
#print("covered by kernel :", 100*eff, "%")
#print("overlap :", sources.shape[0]*(1+2*kernel_size)**2/n**2)
#print()
return np.sqrt(sqdist)
正在使用
测试此版本n=500 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
all_points=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
source_points=np.argwhere(a>1-70/n**2) # set of J locations to find distance to.
#
# code for dist_v1 and dist_vf
#
overlap=5.2
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)
print("cdist v1 :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel version:", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")
给予
cdist v1 : 1148.6694 ms
kernel version: 69.21876999999998 ms
这已经是 ~17 倍的加速了!我还实现了 sq_distance
和 apply_kernel
的 numba 版本:[这是新的正确版本]
@njit(cache=True)
def sq_distance(x1, y1, x2, y2, n):
m1 = x1.size
m2 = x2.size
n2 = n//2
d = np.empty((m1,m2), dtype=np.int32)
for i in range(m1):
for j in range(m2):
dx = np.abs(x1[i] - x2[j] + n2)%n - n2
dy = np.abs(y1[i] - y2[j] + n2)%n - n2
d[i,j] = (dx*dx + dy*dy)
return d
@njit(cache=True)
def apply_kernel(sources, sqdist, kern_size, n, mask):
# creating the kernel
kernel = np.empty((2*kern_size+1, 2*kern_size+1))
vals = np.arange(-kern_size, kern_size+1)**2
for i in range(2*kern_size+1):
for j in range(2*kern_size+1):
kernel[i,j] = vals[i] + vals[j]
mask_kernel = kernel > kern_size**2
I = sources[:,0]
J = sources[:,1]
# applying the kernel for each point
for l in range(sources.shape[0]):
pi = I[l]
pj = J[l]
if pj - kern_size >= 0 and pj + kern_size<n: #if we are in the middle, no need to do the modulo for j
for i in range(2*kern_size+1):
ind_i = np.mod((pi+i-kern_size), n)
for j in range(2*kern_size+1):
ind_j = (pj+j-kern_size)
sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]
else:
for i in range(2*kern_size+1):
ind_i = np.mod((pi+i-kern_size), n)
for j in range(2*kern_size+1):
ind_j = np.mod((pj+j-kern_size), n)
sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]
return
并使用
进行测试overlap=5.2
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)
print("cdist v1 :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel numba (first run):", timeit(lambda: dist_vf(source_points, n, kernel_size), number=1)*1000, "ms") #first run = cimpilation = long
print("kernel numba :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")
结果如下
cdist v1 : 1163.0742 ms
kernel numba (first run): 2060.0802 ms
kernel numba : 8.80377000000001 ms
由于 JIT 编译,第一个 运行 相当慢,但除此之外,改进了 120 倍!
可以通过调整 kernel_size
参数(或 overlap
)从该算法中获得更多信息。当前选择kernel_size
只对少量源点有效。例如,此选择因 source_points=np.argwhere(a>0.85)
(13 秒)而惨败,而手动设置 kernel_size=5
在 22 毫秒内给出答案。
我希望我的 post 不会(不必要地)太复杂,我真的不知道如何更好地组织它。
[编辑 2]:
我对代码的 non-numba 部分给予了更多关注,并设法获得了相当显着的加速,非常接近 numba 可以实现的目标:这是函数的新版本 apply_kernel
:
def apply_kernel(sources, sqdist, kern_size, n, mask):
ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1))
ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1))
kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
mask_kernel = kernel > kern_size**2
for pi, pj in sources:
imin = pi-kern_size
jmin = pj-kern_size
imax = pi+kern_size+1
jmax = pj+kern_size+1
if imax < n and jmax < n and imin >=0 and jmin >=0: # we are inside
sqdist[imin:imax,jmin:jmax] = np.minimum(kernel, sqdist[imin:imax,jmin:jmax])
mask[imin:imax,jmin:jmax] *= mask_kernel
elif imax < n and imin >=0:
ind_j = (pj+ker_j.ravel())%n
sqdist[imin:imax,ind_j] = np.minimum(kernel, sqdist[imin:imax,ind_j])
mask[imin:imax,ind_j] *= mask_kernel
elif jmax < n and jmin >=0:
ind_i = (pi+ker_i.ravel())%n
sqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax])
mask[ind_i,jmin:jmax] *= mask_kernel
else :
ind_i = (pi+ker_i)%n
ind_j = (pj+ker_j)%n
sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
mask[ind_i,ind_j] *= mask_kernel
主要的优化是
- 使用切片索引(而不是密集数组)
- 稀疏索引的使用(我之前怎么没想到)
测试
overlap=5.4
kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)
print("cdist v1 :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
print("kernel v2 :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")
给予
cdist v1 : 1209.8163000000002 ms
kernel v2 : 11.319049999999997 ms
这是对 cdist 的 100 倍改进,比以前的 numpy-only 版本改进了 ~5.5 倍,只比我用 numba 实现的速度慢了 ~25%。
我将从图像处理的角度展示另一种方法,您可能会感兴趣,无论它是否最快。为了方便起见,我只实现了一个奇数 n
.
与其考虑一组 nxn
点 i
,不如考虑 nxn
框。我们可以将其视为二值图像。让 j
中的每个点都成为该图像中的正像素。对于 n=5
,这看起来像:
现在让我们考虑图像处理中的另一个概念:膨胀。对于任何输入像素,如果它的 neighborhood
中有一个正像素,则输出像素将为 1。这个邻域由所谓的 Structuring Element
定义:一个布尔核,其中那些将显示哪个要考虑的邻居。
以下是我为这个问题定义 SE 的方式:
Y, X = np.ogrid[-n:n+1, -n:n+1]
SQ = X*X+Y*Y
H = SQ == r
直觉上,H 是一个掩码,表示“满足方程 x*x+y*y=r
的中心点的所有点。也就是说,H 中的所有点都位于距中心 sqrt(r)
处。另一个可视化,它会非常清楚:
这是一个不断扩大的像素圈。每个蒙版中的每个白色像素表示与中心像素的距离恰好为 sqrt(r)
的点。您可能还可以看出,如果我们迭代地增加 r
的值,我们实际上会稳定地覆盖特定位置周围的所有像素位置,最终覆盖整个图像。 (r 的一些值没有给出响应,因为对于任何一对点都不存在这样的距离 sqrt(r)。我们跳过那些 r 值——比如 3。)
这就是主要算法的作用。
- 我们将从 0 开始逐渐增加
r
的值到某个上限。 - 在每一步中,如果图像中的任何位置 (x,y) 给出了对膨胀的响应,则意味着在距离它恰好 sqrt(r) 距离处有一个 j 点!
- 我们可以多次找到匹配;我们将只保留第一场比赛并放弃更多比赛以获得积分。我们这样做直到所有像素 (x,y) 都找到它们的最小距离/第一个匹配项。
所以你可以说这个算法依赖于 nxn 图像中唯一距离对的数量。
这也意味着如果你在 j 中的点越来越多,算法实际上会变得更快,这违反常识!
这个扩张算法的最坏情况是当你有最少数量的点(正好是 j 中的一个点)时,因为那时它需要将 r 迭代到一个非常高的值才能从很远的点获得匹配.
在实施方面:
n=5 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
I=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
J=np.argwhere(a>0.85)
Y, X = np.ogrid[-n:n+1, -n:n+1]
SQ = X*X+Y*Y
point_space = np.zeros((n, n))
point_space[J[:,0], J[:,1]] = 1
C1 = point_space[:, :n//2]
C2 = point_space[:, n//2+1:]
C = np.hstack([C2, point_space, C1])
D1 = point_space[:n//2, :]
D2 = point_space[n//2+1:, :]
D2_ = np.hstack([point_space[n//2+1:, n//2+1:],D2,point_space[n//2+1:, :n//2]])
D1_ = np.hstack([point_space[:n//2:, n//2+1:],D1,point_space[:n//2, :n//2]])
D = np.vstack([D2_, C, D1_])
p = (3*n-len(D))//2
D = np.pad(D, (p,p), constant_values=(0,0))
plt.imshow(D, cmap='gray')
plt.title(f'n={n}')
如果你看一下 n=5 的图像,你就会知道我做了什么;我只是简单地用四个象限填充图像以表示循环 space,然后添加一些额外的零填充以解决最坏情况下的搜索边界。
@nb.jit
def dilation(image, output, kernel, N, i0, i1):
for i in range(i0,i1):
for j in range(i0, i1):
a_0 = i-(N//2)
a_1 = a_0+N
b_0 = j-(N//2)
b_1 = b_0+N
neighborhood = image[a_0:a_1, b_0:b_1]*kernel
if np.any(neighborhood):
output[i-i0,j-i0] = 1
return output
@nb.njit(cache=True)
def progressive_dilation(point_space, out, total, dist, SQ, n, N_):
for i in range(N_):
if not np.any(total):
break
H = SQ == i
rows, cols = np.nonzero(H)
if len(rows) == 0: continue
rmin, rmax = rows.min(), rows.max()
cmin, cmax = cols.min(), cols.max()
H_ = H[rmin:rmax+1, cmin:cmax+1]
out[:] = False
out = dilation(point_space, out, H_, len(H_), n, 2*n)
idx = np.logical_and(out, total)
for a, b in zip(*np.where(idx)):
dist[a, b] = i
total = total * np.logical_not(out)
return dist
def dilateWrap(D, SQ, n):
out = np.zeros((n,n), dtype=bool)
total = np.ones((n,n), dtype=bool)
dist=-1*np.ones((n,n))
dist = progressive_dilation(D, out, total, dist, SQ, n, 2*n*n+1)
return dist
dout = dilateWrap(D, SQ, n)
如果我们将 dout 可视化,我们实际上可以获得距离的绝佳视觉表示。
暗点基本上是存在 j 个点的位置。最亮点自然意味着离任何 j 最远的点。请注意,我将值保留为平方形式以获得整数图像。实际距离仍然相差一个平方根。结果与球场算法的输出相匹配。
# after resetting n = 501 and rerunning the first block
N = J.copy()
NE = J.copy()
E = J.copy()
SE = J.copy()
S = J.copy()
SW = J.copy()
W = J.copy()
NW = J.copy()
N[:,1] = N[:,1] - n
NE[:,0] = NE[:,0] - n
NE[:,1] = NE[:,1] - n
E[:,0] = E[:,0] - n
SE[:,0] = SE[:,0] - n
SE[:,1] = SE[:,1] + n
S[:,1] = S[:,1] + n
SW[:,0] = SW[:,0] + n
SW[:,1] = SW[:,1] + n
W[:,0] = W[:,0] + n
NW[:,0] = NW[:,0] + n
NW[:,1] = NW[:,1] - n
def distBP(I,J):
tree = BallTree(np.concatenate([J,N,E,S,W,NE,SE,SW,NW]), leaf_size=15, metric='euclidean')
dist = tree.query(I, k=1, return_distance=True)
minimum_distance = dist[0].reshape(n,n)
return minimum_distance
print(np.array_equal(distBP(I,J), np.sqrt(dilateWrap(D, SQ, n))))
输出:
True
现在检查时间,在 n=501。
from timeit import timeit
nl=1
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: dilateWrap(D, SQ, n),number=nl))
输出:
ball tree: 1.1706031339999754
dilation: 1.086665302000256
我会说它们大致相等,尽管膨胀有非常小的优势。其实dilation还是少了一个平方根运算,补上吧
from timeit import timeit
nl=1
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: np.sqrt(dilateWrap(D, SQ, n)),number=nl))
输出:
ball tree: 1.1712950239998463
dilation: 1.092416919000243
平方根对时间的影响基本可以忽略
现在,我之前说过,当 j 中实际上有更多点时,膨胀会变得更快。所以让我们增加j中的点数。
n=501 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n))
I=np.argwhere(a>-1) # all points, for each loc we want distance to nearest J
J=np.argwhere(a>0.4) # previously a>0.85
现在查看时间:
from timeit import timeit
nl=1
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: np.sqrt(dilateWrap(D, SQ, n)),number=nl))
输出:
ball tree: 3.3354218500007846
dilation: 0.2178608220001479
球树实际上变慢了,而膨胀变快了!这是因为如果有很多 j 个点,我们可以通过几次重复的扩张快速找到所有距离。我觉得这个效果很有趣——通常你会认为运行时间会随着点数的增加而变差,但这里恰恰相反。
相反,如果我们减少 j,我们会看到膨胀变慢:
#Setting a>0.9
print("ball tree:",timeit(lambda: distBP(I,J),number=nl))
print("dilation:",timeit(lambda: np.sqrt(dilateWrap(D, SQ, n)),number=nl))
输出:
ball tree: 1.010353464000218
dilation: 1.4776274510004441
我认为我们可以安全地得出结论,基于卷积或内核的方法在这个特定问题上会提供更好的收益,而不是基于对、点或树的方法。
最后,开头已经提到了,我再说一遍:整个实现只考虑n的奇数;我没有耐心为偶数 n 计算适当的填充。 (如果你熟悉图像处理,你可能以前遇到过这个问题:有了 odd 一切都变得更容易了大小。)
这还可以进一步优化,因为我只是偶尔涉足 numba。