Python:将点分配给容器的更快或无循环方式?
Python: Faster or Loop-Free Way of Assigning Points to Bins?
我有一个包含 N 个二维点的 N×2 数组,我想将其分配给一个 M×K 的垃圾箱网格。
例如,点 [m + 0.1, k]
和 [m + 0.1, k + 0.9]
应该属于 bin [m, k]
,其中 m
和 k
都是整数。有可能某个点不属于任何 bin。
在实现方面,我希望将结果存储在逻辑 M×K×N 数组 in_bin
中,其中 in_bin[m, k, n]
是 True
,如果点 n
落入 bin [m, k]
.
这是我的做法,天真地使用双循环。
M = 10
K = 11
N = 100
pts = 20 * np.random.rand(N, 2)
in_bin = np.zeros((M, K, N), dtype=bool)
for m in range(M):
for k in range(K):
inbin_h = (pts[:, 0] >= m) & (pts[:, 0] < (m + 1))
inbin_w = (pts[:, 1] >= k) & (pts[:, 1] < (k + 1))
in_bin[m, k, np.where(inbin_h & inbin_w)[0]] = True
您可以使用 histogram2d
执行此操作,如下所示:
hist = np.dstack(np.histogram2d([pts[i,0]],[pts[i,1]],bins=[np.arange(M+1),np.arange(K+1)])[0] for i in range(len(pts)))
这只涉及对点数的单个 for 循环。如果 N 比 M*K 小得多,这应该会更快。
这是另一种没有任何 for 循环的方法,使用 searchsorted
,这是 histogram2d
使用的方法:
def bin_points(pts, m, k):
binsx = np.arange(m+1)
binsy = np.arange(k+1)
index_x = np.searchsorted(binsx,pts[:,0]) - 1
index_y = np.searchsorted(binsy,pts[:,1]) - 1
# mask out values which fall outside the bins
mask = (index_x >= 0) & (index_x < m) & (index_y >= 0) & (index_y < k)
index_x = index_x[mask]
index_y = index_y[mask]
n = np.arange(pts.shape[0])[mask]
in_bin = np.zeros((M, K, pts.shape[0]), dtype=bool)
in_bin[index_x,index_y,n] = 1
这里有一些基准:
M = 10, K = 11, N = 100
In [2]: %timeit bin_points(pts,M,K)
10000 loops, best of 3: 34.1 µs per loop
In [3]: %timeit bin_points_double_for_loop(pts,M,K)
1000 loops, best of 3: 1.71 ms per loop
In [4]: %timeit bin_points_broadcast(pts,M,K)
10000 loops, best of 3: 39.6 µs per loop
M = 100,K = 110,N = 1000
In [2]: %timeit bin_points(pts,M,K)
1000 loops, best of 3: 721 µs per loop
In [3]: %timeit bin_points_double_for_loop(pts,M,K)
1 loop, best of 3: 249 ms per loop
In [4]: %timeit bin_points_broadcast(pts,M,K)
100 loops, best of 3: 3.04 ms per loop
实际上并不需要 where
(并不是说这会大大改变速度):
In [120]: in_bin1 = np.zeros((M, K, N), dtype=bool)
...: for m in range(M):
...: for k in range(K):
...: inbin_h = (pts[:, 0] >= m) & (pts[:, 0] < (m + 1))
...: inbin_w = (pts[:, 1] >= k) & (pts[:, 1] < (k + 1))
...: in_bin1[m, k, inbin_h & inbin_w] = True
但我们可以一次为所有 m
和 k
分配:
In [125]: x0=(pts[:,0]>=np.arange(M)[:,None]) & (pts[:,0]<np.arange(1,M+1)[:,None]);
In [126]: x1=(pts[:,1]>=np.arange(K)[:,None]) & (pts[:,1]<np.arange(1,K+1)[:,None]);
In [127]: x0.shape
Out[127]: (10, 100)
In [128]: x1.shape
Out[128]: (11, 100)
将这些与广播结合起来:
In [129]: xx = x0[:,None,:] & x1[None,:,:]
In [130]: xx.shape
Out[130]: (10, 11, 100)
In [131]: np.allclose(in_bin1, xx) # and check
Out[131]: True
我们正在检查 pts
中的那些浮动 pt 数字是否在每次迭代的每个整数 bin 中。因此,我们可以使用的技巧是将那些浮动的 pt 数字转换为它们的 floored-down 数字。此外,我们需要屏蔽掉满足 range(M)
和 range(N)
的有效值。仅此而已!
这是实现 -
def binpts(pts, M, K):
N = len(pts)
in_bin_out = np.zeros((M, K, N), dtype=bool)
mask = (pts[:,0]<M) & (pts[:,1]<K)
pts_f = pts[mask]
r,c = pts_f.astype(int).T
in_bin_out[r, c, mask] = 1
return in_bin_out
基准测试
浮动点数组中 运行ge 的大型数组的计时与其大小成正比,如给定示例中提供的 -
案例 #1:
In [2]: M = 100
...: K = 101
...: N = 10000
...: np.random.seed(0)
...: pts = 2000 * np.random.rand(N, 2)
# @hpaulj's soln
In [3]: %%timeit
...: x0=(pts[:,0]>=np.arange(M)[:,None]) & (pts[:,0]<np.arange(1,M+1)[:,None])
...: x1=(pts[:,1]>=np.arange(K)[:,None]) & (pts[:,1]<np.arange(1,K+1)[:,None])
...: xx = x0[:,None,:] & x1[None,:,:]
10 loops, best of 3: 47.5 ms per loop
# @user545424's soln
In [6]: %timeit bin_points(pts,M,K)
1000 loops, best of 3: 331 µs per loop
In [7]: %timeit binpts(pts,M,K)
10000 loops, best of 3: 125 µs per loop
注意:
@hpaulj 的 soln 是内存密集型的,我 运行 在较大的上使用它时内存不足。
案例 #2:
In [8]: M = 100
...: K = 101
...: N = 100000
...: np.random.seed(0)
...: pts = 20000 * np.random.rand(N, 2)
In [9]: %timeit bin_points(pts,M,K)
...: %timeit binpts(pts,M,K)
100 loops, best of 3: 2.31 ms per loop
1000 loops, best of 3: 585 µs per loop
案例 #3:
In [10]: M = 100
...: K = 101
...: N = 1000000
...: np.random.seed(0)
...: pts = 200000 * np.random.rand(N, 2)
In [11]: %timeit bin_points(pts,M,K)
...: %timeit binpts(pts,M,K)
10 loops, best of 3: 34.6 ms per loop
100 loops, best of 3: 2.78 ms per loop
我有一个包含 N 个二维点的 N×2 数组,我想将其分配给一个 M×K 的垃圾箱网格。
例如,点 [m + 0.1, k]
和 [m + 0.1, k + 0.9]
应该属于 bin [m, k]
,其中 m
和 k
都是整数。有可能某个点不属于任何 bin。
在实现方面,我希望将结果存储在逻辑 M×K×N 数组 in_bin
中,其中 in_bin[m, k, n]
是 True
,如果点 n
落入 bin [m, k]
.
这是我的做法,天真地使用双循环。
M = 10
K = 11
N = 100
pts = 20 * np.random.rand(N, 2)
in_bin = np.zeros((M, K, N), dtype=bool)
for m in range(M):
for k in range(K):
inbin_h = (pts[:, 0] >= m) & (pts[:, 0] < (m + 1))
inbin_w = (pts[:, 1] >= k) & (pts[:, 1] < (k + 1))
in_bin[m, k, np.where(inbin_h & inbin_w)[0]] = True
您可以使用 histogram2d
执行此操作,如下所示:
hist = np.dstack(np.histogram2d([pts[i,0]],[pts[i,1]],bins=[np.arange(M+1),np.arange(K+1)])[0] for i in range(len(pts)))
这只涉及对点数的单个 for 循环。如果 N 比 M*K 小得多,这应该会更快。
这是另一种没有任何 for 循环的方法,使用 searchsorted
,这是 histogram2d
使用的方法:
def bin_points(pts, m, k):
binsx = np.arange(m+1)
binsy = np.arange(k+1)
index_x = np.searchsorted(binsx,pts[:,0]) - 1
index_y = np.searchsorted(binsy,pts[:,1]) - 1
# mask out values which fall outside the bins
mask = (index_x >= 0) & (index_x < m) & (index_y >= 0) & (index_y < k)
index_x = index_x[mask]
index_y = index_y[mask]
n = np.arange(pts.shape[0])[mask]
in_bin = np.zeros((M, K, pts.shape[0]), dtype=bool)
in_bin[index_x,index_y,n] = 1
这里有一些基准:
M = 10, K = 11, N = 100
In [2]: %timeit bin_points(pts,M,K)
10000 loops, best of 3: 34.1 µs per loop
In [3]: %timeit bin_points_double_for_loop(pts,M,K)
1000 loops, best of 3: 1.71 ms per loop
In [4]: %timeit bin_points_broadcast(pts,M,K)
10000 loops, best of 3: 39.6 µs per loop
M = 100,K = 110,N = 1000
In [2]: %timeit bin_points(pts,M,K)
1000 loops, best of 3: 721 µs per loop
In [3]: %timeit bin_points_double_for_loop(pts,M,K)
1 loop, best of 3: 249 ms per loop
In [4]: %timeit bin_points_broadcast(pts,M,K)
100 loops, best of 3: 3.04 ms per loop
实际上并不需要 where
(并不是说这会大大改变速度):
In [120]: in_bin1 = np.zeros((M, K, N), dtype=bool)
...: for m in range(M):
...: for k in range(K):
...: inbin_h = (pts[:, 0] >= m) & (pts[:, 0] < (m + 1))
...: inbin_w = (pts[:, 1] >= k) & (pts[:, 1] < (k + 1))
...: in_bin1[m, k, inbin_h & inbin_w] = True
但我们可以一次为所有 m
和 k
分配:
In [125]: x0=(pts[:,0]>=np.arange(M)[:,None]) & (pts[:,0]<np.arange(1,M+1)[:,None]);
In [126]: x1=(pts[:,1]>=np.arange(K)[:,None]) & (pts[:,1]<np.arange(1,K+1)[:,None]);
In [127]: x0.shape
Out[127]: (10, 100)
In [128]: x1.shape
Out[128]: (11, 100)
将这些与广播结合起来:
In [129]: xx = x0[:,None,:] & x1[None,:,:]
In [130]: xx.shape
Out[130]: (10, 11, 100)
In [131]: np.allclose(in_bin1, xx) # and check
Out[131]: True
我们正在检查 pts
中的那些浮动 pt 数字是否在每次迭代的每个整数 bin 中。因此,我们可以使用的技巧是将那些浮动的 pt 数字转换为它们的 floored-down 数字。此外,我们需要屏蔽掉满足 range(M)
和 range(N)
的有效值。仅此而已!
这是实现 -
def binpts(pts, M, K):
N = len(pts)
in_bin_out = np.zeros((M, K, N), dtype=bool)
mask = (pts[:,0]<M) & (pts[:,1]<K)
pts_f = pts[mask]
r,c = pts_f.astype(int).T
in_bin_out[r, c, mask] = 1
return in_bin_out
基准测试
浮动点数组中 运行ge 的大型数组的计时与其大小成正比,如给定示例中提供的 -
案例 #1:
In [2]: M = 100
...: K = 101
...: N = 10000
...: np.random.seed(0)
...: pts = 2000 * np.random.rand(N, 2)
# @hpaulj's soln
In [3]: %%timeit
...: x0=(pts[:,0]>=np.arange(M)[:,None]) & (pts[:,0]<np.arange(1,M+1)[:,None])
...: x1=(pts[:,1]>=np.arange(K)[:,None]) & (pts[:,1]<np.arange(1,K+1)[:,None])
...: xx = x0[:,None,:] & x1[None,:,:]
10 loops, best of 3: 47.5 ms per loop
# @user545424's soln
In [6]: %timeit bin_points(pts,M,K)
1000 loops, best of 3: 331 µs per loop
In [7]: %timeit binpts(pts,M,K)
10000 loops, best of 3: 125 µs per loop
注意: @hpaulj 的 soln 是内存密集型的,我 运行 在较大的上使用它时内存不足。
案例 #2:
In [8]: M = 100
...: K = 101
...: N = 100000
...: np.random.seed(0)
...: pts = 20000 * np.random.rand(N, 2)
In [9]: %timeit bin_points(pts,M,K)
...: %timeit binpts(pts,M,K)
100 loops, best of 3: 2.31 ms per loop
1000 loops, best of 3: 585 µs per loop
案例 #3:
In [10]: M = 100
...: K = 101
...: N = 1000000
...: np.random.seed(0)
...: pts = 200000 * np.random.rand(N, 2)
In [11]: %timeit bin_points(pts,M,K)
...: %timeit binpts(pts,M,K)
10 loops, best of 3: 34.6 ms per loop
100 loops, best of 3: 2.78 ms per loop