自定义搜索 numpy 数组中的连续值
Customized search for consequtive values in numpy array
请假定以下 NumPy 数组:
A = array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
我想找到这个数组中 N
个连续值等于零(含)的索引。
例如,假设 N=3
。我们知道 A[2]=0
而 A[3]>0
。这样,数组的第二个元素A
就不是连续三个零值(含)。数组 A 的理想结果如下所示:
B = array([False, False, False, False, True, True, True, True, False, False])
我可以写一个循环作为这个问题的答案:
N = 3
A = np.array([1, 1, 0, 0, 0, 0, 0, 0, 0, 0])
B = np.zeros(len(A), dtype=bool)
for i in range(len(A)):
if (i + N <= len(A)) and (sum(A[i:i + N]) == 0):
B[i] = True
由于矩阵 A 可能更大,而且我需要执行上述搜索过程数百万次,因此我需要最快的方法。你的建议是什么?
要获取自己的索引,可以从以下方法入手:
import bumpy as np
my__Truth = np.array([True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, False, True])
np.sort(my_Truth, in_place=my_Truth[0])
# ~ Obtain Index ~ #
index_values = np.where(my_Truth)
print(index_values)
# un-comment if using python2
# print index_values
我已经实施了一些您可以尝试的其他替代方案:
- 使用
numba
JIT 编译循环代码
- 使用
np.correlate
做一个signal-processing类型的相关,然后使用nonzero
获得指数
- 同上,但使用
stride_tricks
代替
我想对解决方案进行基准测试,但这里有点晚了。如果你愿意,我明天下班后可以做。
我也将 dtype=np.int8
用于 B
,因为更容易直观地确认输出是否正确(请参阅输出)。
import numpy as np
import numba
from numpy.lib.stride_tricks import as_strided
A = np.array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
N = 3
def baseline(A: np.ndarray):
'''
Your solution
'''
B = np.zeros(len(A), dtype=np.int8)
for i in range(len(A)):
if (i + N <= len(A)) and (sum(A[i:i + N]) == 0):
B[i] = 1
return B
@numba.njit()
def baseline_numba(A: np.ndarray, N: int):
'''
You may need to run this once first to see performance gains
as this has to be JIT compiled by Numba
'''
B = np.zeros(len(A), dtype=np.int8)
for i in range(len(A)):
if (i + N <= len(A)) and (A[i:i + N].sum() == 0):
B[i] = 1
return B
def correlational(A: np.ndarray):
'''
Use a correlation operation
'''
B = np.zeros_like(A, dtype=np.int8)
B[(np.correlate(A, np.full(N, 1), 'valid') == 0).nonzero()[0]] = 1
return B
def stridetricks(A: np.ndarray):
'''
Same idea as using correlation, but with stride tricks
Maybe it will be faster? Probably not.
'''
u = np.array(A.itemsize)
B = np.zeros_like(A, dtype=np.int8)
B[(as_strided(A, (len(A) - N + 1, N), u * (1, 1)).sum(1) == 0).nonzero()[0]] = 1
return B
print(baseline(A))
print(baseline_numba(A, N))
print(correlational(A))
print(stridetricks(A))
输出:
[0 0 0 0 1 1 1 1 0 0]
[0 0 0 0 1 1 1 1 0 0]
[0 0 0 0 1 1 1 1 0 0]
[0 0 0 0 1 1 1 1 0 0]
查找连续值和相关索引是其他 SO 问题中有解决方案的主题之一。在这方面我们可以发现range indices for repeated zeros and their numbers:
def zero_runs(a):
iszero = np.concatenate(([0], np.equal(a, 0).view(np.int8), [0]))
absdiff = np.abs(np.diff(iszero))
ranges = np.where(absdiff == 1)[0].reshape(-1, 2)
return ranges
# [[ 2 3]
# [ 4 10]]
当我们找到索引 运行ges 时,我们可以 使用
def filled_array(start, end, length):
out = np.zeros((length), dtype=int)
np.add.at(out, start, 1)
np.add.at(out, end, -1)
return out.cumsum() > 0
def filled_array_v2(start, end, length):
out =np.bincount(start, minlength=length) - np.bincount(end, minlength=length)
return out.cumsum().astype(bool)
因此,通过使用上述函数并定义如下主函数:
def final(A, N):
runs = zero_runs(A)
repeats_cound = runs[:, 1] - runs[:, 0]
# [1 6]
suspected_ranges = runs[repeats_cound >= 3]
# [[ 4 10]]
start = suspected_ranges[:, 0]
end = start + N + 1
return filled_array(start, end, length=A.size) # filled_array_v2(start, end, length=A.size)
# [False False False False True True True True False False]
在我关于数据数组体积 10000
的一项基准测试中,此代码 运行 比提议的 numba 方法快 。
一些基准测试 (Colab)
通过使用 any
(而不是 sum
)作为 @Mercury 提到的循环 ,我们可以改进 Naphat 的 baseline_numba 通过在装饰器行中使用 nb.prange
和 parallel=True
并行化它。
按A尺码:20000on Colab:
# N ----> 3:
1000 loops, best of 5: 105 µs per loop Norok
--------------------
100 loops, best of 5: 35 ms per loop Nephat baseline
100 loops, best of 5: 730 µs per loop Nephat baseline_numba
1000 loops, best of 5: 225 µs per loop Nephat correlational
1000 loops, best of 5: 397 µs per loop Nephat stridetricks
--------------------
1000 loops, best of 5: 42.3 µs per loop Michael Szczesny (The fastest)
--------------------
1000 loops, best of 5: 352 µs per loop This answer
1000 loops, best of 5: 279 µs per loop Nephat baseline_numba PARALLELIZED
=================================================================
# N ----> 10:
1000 loops, best of 5: 25.3 µs per loop Norok (The fastest)
--------------------
100 loops, best of 5: 46.8 ms per loop Nephat baseline
100 loops, best of 5: 794 µs per loop Nephat baseline_numba
1000 loops, best of 5: 318 µs per loop Nephat correlational
1000 loops, best of 5: 433 µs per loop Nephat stridetricks
--------------------
1000 loops, best of 5: 31.9 µs per loop Michael Szczesny
--------------------
1000 loops, best of 5: 341 µs per loop This answer
1000 loops, best of 5: 291 µs per loop Nephat baseline_numba PARALLELIZED
=================================================================
# N ----> 20
1000 loops, best of 5: 6.52 µs per loop Norok (The fastest)
--------------------
1000 loops, best of 5: 31.5 µs per loop Michael Szczesny
--------------------
1000 loops, best of 5: 350 µs per loop This answer
1000 loops, best of 5: 292 µs per loop Nephat baseline_numba PARALLELIZED
=================================================================
# N ----> 30
1000 loops, best of 5: 4.01 µs per loop Norok (The fastest)
--------------------
1000 loops, best of 5: 31.8 µs per loop Michael Szczesny
最新基准:
根据我对 the prepared colab link(最后两个单元格)的上次测试,以下结果分别显示了最佳性能:
- for
N <10
---> 理查德,(==
) 丹尼尔
- for
10 < N <20
---> Richard, Michael, (==
) Norok
- for
20 < N <60
---> Norok, Richard, Michael
- for
N >60
---> Norok, Michael, Richard
具有线性时间复杂度的解决方案,使用numba
编译。
想法是查看 A 的每个元素一次并计算计数器 c 中零的出现次数。如果遇到 non-zero 元素,c 将重置为 0。只要 c >= N
将结果数组设置为 True
在适当的 索引.
import numba as nb # tested with numba 0.55.1
@nb.jit
def numba_linear(A, N):
B = np.zeros(len(A), dtype=np.bool_)
c = 0
for i in range(len(A)):
c = c+1 if A[i] == 0 else 0
if c >= N: B[i - N + 1] = 1
return B
A = np.array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
numba_linear(A, 3)
输出
array([False, False, False, False, True, True, True, True, False,
False])
应用了针对@Nephat with @Mercury's loop 的基准测试(sum
在我的基准测试中比 any
快)。
import numpy as np
@nb.njit
def baseline_numba(A: np.ndarray, N: int):
'''
You may need to run this once first to see performance gains
as this has to be JIT compiled by Numba
'''
B = np.zeros(len(A), dtype=np.int8)
for i in nb.prange(len(A)-N+1):
if A[i:i + N].sum() == 0:
B[i] = 1
return B
rng = np.random.default_rng()
A = rng.integers(2, size=20000)
N = 3
%timeit baseline_numba(A, N)
#10000 loops, best of 5: 120 µs per loop
%timeit numba_linear(A, N)
#10000 loops, best of 5: 29.4 µs per loop
np.testing.assert_array_equal(numba_linear(A,N), baseline_numba(A,N))
TL;DR
只要块大小N
(以下简称m
或size
)足够大,并且输入有大量 non-zero 值:
import numpy as np
import numba as nb
@nb.njit
def find_blocks_while2_nb(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
i = j = 0
while i < n - size + 1:
# last iteration found a block, only checking one value
if j < i and arr[i + size - 1] == value:
result[i] = True
i += 1
else:
# search backward for index of last non-block value
j = i + size - 1
while j >= i and arr[j] == value:
j -= 1
if j < i:
# block found, advance by 1
result[i] = True
i += 1
else:
# block not found, advance by last non-block value occurrence
j += 1
i = j
return result
本质上,它是您原始方法的 Numba 加速版本,具有多个 micro-optimizations:
- 通过对主计数器设置适当的限制来减少主循环的执行次数
- 检查块是否包含所有相同值的部分:
- 避免相对昂贵的数组切片
- returns失败的位置
- 运行向后
- 避免re-checking个连续的块
对于小块大小,更多 micro-optimized 方法可能会更快,如其他答案中所述,并在下面简要讨论。
简介
找到相同值块开始处的索引的问题可以看作是计算机科学中经典问题的专门化:在数组中找到 sub-array(详细讨论 here).
在这种情况下,sub-string(即块)由一系列相同的值组成,更准确地说是 0
s。
理论
这个问题的复杂性通常取决于数组的大小 (n
) 和块的大小 (m
或 size
),以及作为它们中的实际值(显然)。
当然,假设 n > m
,因为很明显,如果 n == m
的解决方案是微不足道的,而当 n < m
时,数组无法容纳该块。
同样的道理,也找不到n - m
以上的索引。
虽然天真的实现基本上需要两个嵌套循环,一个用于字符串,一个用于 sub-string(这将导致 O(nm)
计算复杂性),但可以利用以下事实该块包含与 运行 算法在保证线性时间 O(n)
和恒定内存要求 O(1)
.
中相同的值
但是,假设输入数组具有块值和 non-block 值的混合分布,则可以在大约 运行 内完成。 O(n / (m - ε))
时间,仍然有 O(1)
内存消耗。 ε < m - 1
是一个与找到 non-block 值的概率成正比的参数。
这个想法是,当试图确定一个块是否从位置 i
开始时,如果数组在位置 j
和 j - i < m
处有一个 non-block 值,那么下一个要评估的位置是 j + 1
,因为该块不能放在 i
和 j
之间。
此外,如果搜索从后面开始(即在位置 i - m
),那么对于足够异构的输入,未检查的位置数量是最大的,因为不需要检查所有 j - i
值.
出于类似的原因,如果知道一个块从位置i
开始,要检查一个块是否从位置i + j
和j < m
开始,只有[之间的值=50=]和i + m + j
需要检查,i
和i + m - 1
之间的值不需要再次检查。
实施
这是一个手动案例,其中生成器非常适合,因为可以利用其惰性评估属性仅在需要时执行少量计算。
此外,事先不知道结果有多大(即可能找到很多块或根本找不到块)。
但是,如果这是更大计算的一部分,则 pre-allocate 结果的大小并仅修改其索引的值可能会更快。
不需要分配超过 n - m + 1
个值。
但是,为了与问题的要求兼容,输入大小的数组是pre-allocated。
如果需要,可以很容易地调整以下实现以避免浪费这些额外的内存。
更准确地说,基于显式循环的解决方案可以通过将 result[i] = True
替换为 yield i
(或类似的)来转换为生成器。
同样,本机计算布尔数组的解决方案可以转换为生成器,例如:
for i in np.nonzero(result)[0]:
yield i
分析的其余部分将假设目标结果是布尔数组。
在纯 Python 中实现这一点的挑战之一是显式循环相对较慢。
因此,使用一些加速技术(例如 Numba 或 Cython)来减少计算时间是很方便的。
或者,鉴于该算法主要由两个循环组成(一个用于数组,一个用于 sub-array),我们可能会寻求对一个或两个循环进行矢量化。
但是,请注意,这些方法必然会在 O(nm)
worst-case 时间内 运行。
基本的天真方法如下:
def find_blocks_for2(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
for i in range(n - size + 1):
all_equal = True
for j in range(i, i + size):
if arr[j] != value:
all_equal = False
break # vectorized approaches may not have this
if all_equal:
result[i] = True
return result
这可以在多个方面进行改进。
一种方法源于内循环实际上是无用的,因为它足以保持跟踪出现的连续“好”值的数量(这与 基本相同):
import numpy as np
def find_blocks_for(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
block_size = 0
for i in range(n):
if arr[i] == value:
block_size += 1
else:
block_size = 0
if block_size >= size:
result[i - size + 1] = True
return result
请注意,这可以看作是 Rabin-Karp algorithm 的一种特殊化,其中散列的作用由 block_size
计数器发挥。
另一种可能的方法是仅当最后一次迭代已经识别出块的存在时才跳过第二个循环,并使其向后 运行ning 以最大化跳转到下一个可能的块:
import numpy as np
def find_blocks_while2(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
i = j = 0
while i < n - size + 1:
# last iteration found a block, only checking one value
if j < i and arr[i + size - 1] == value:
result[i] = True
i += 1
else:
# search backward for index of last non-block value
j = i + size - 1
while j >= i and arr[j] == value:
j -= 1
if j < i:
# block found, advance by 1
result[i] = True
i += 1
else:
# block not found, advance by last non-block value occurrence
j += 1
i = j
return result
请注意,这可以看作是 Knuth-Morris-Pratt algorithm 的特化,不需要为块的每个值计算偏移量(它们都是相同的)。
可以为上述方法设计许多变体,所有变体都具有相同的渐近行为,但 运行ning 时间略有不同,具体取决于实际输入。
例如,可以将条件的顺序更改为 speed-up 找到许多连续块的情况,代价是 运行 在其他地方执行更多指令,或者相反。
所有这些 运行 在没有使用 Numba、Cython 或 Pypy 加速的情况下都太慢了。
Pypy 使用不同的解释器,我不会 discuss/benchmark 它进一步。
Numba 提供了一种最直接有效的加速方式(只需应用装饰器),例如:
import numba as nb
find_blocks_while2_nb = nb.njit(find_blocks_while2)
find_blocks_while2_nb.__name__ = "find_blocks_while2_nb"
也可以使用Cython,例如(使用 %load_ext Cython
Jupyter 魔法),但要充分利用编译,需要格外小心以确保瓶颈代码 运行s 没有 Python 交互:
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import cython as cy
import numpy as np
cdef _find_blocks_lin_cy(long[::1] arr, char[::1] result, Py_ssize_t n, Py_ssize_t size, long value):
cdef Py_ssize_t block_size = 0
for i in range(n):
if arr[i] == value:
block_size += 1
else:
block_size = 0
if block_size >= size:
result[i - size + 1] = True
def find_blocks_lin_cy(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
_find_blocks_lin_cy(arr, result, n, size, value)
return result
Numba 还是 Cython 会更快取决于代码可以触发多少优化,因此也取决于使用哪种编译器和 CPU 组合。
或者,可以对内部循环或外部循环(或两者)进行矢量化。
当然这个对内循环多还是外循环多运行最有效。
最大的循环应该 运行 向量化最大 speed-up.
矢量化内部循环会导致接近于此的结果(类似于 ,但边界更小,sum()
替换为 np.all()
,它也支持 short-circuiting):
def find_blocks_for_all(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
for i in range(n - size + 1):
if np.all(arr[i:i + size] == value):
result[i] = True
return result
向量化外循环会导致类似的结果(类似于 中呈现的内容,但整体代码更清晰,尤其是避免不必要的函数调用):
def find_blocks_for_and(arr, size, value):
result = (arr == value)
for i in range(1, size):
result[:-1] &= result[1:]
result[1 - size:] = False
return result
向量化两个循环将导致接近于此的结果(类似于 中的一种方法):
def find_blocks_strides(arr, size, value):
n = arr.size
strides = (arr.itemsize,) * 2
block = np.full(size, value, dtype=arr.dtype)
# note that `as_strided()` does not allocate extra memory
# the downside is that buffer overflow will occur and
# extra care must be taken not to use data outside the boundaries
check = np.lib.stride_tricks.as_strided(arr, (n, size), strides)
result = np.all(check == block, axis=-1)
result[1 - size:] = False
return result
假设输入仅包含正值且该块仅包含零,则上述代码可以进一步专门化。
这意味着将 np.all()
调用替换为 np.sum()
或类似的调用。
这实际上是有益的一种方法是使用完全矢量化方法,其中 as_strided()
可以通过计算与 non-zero 块的相关性来代替(类似于 中的一种方法):
def find_blocks_0_conv(arr, size):
n = arr.size
result = np.zeros_like(arr, dtype=np.bool_)
block = np.ones(size, dtype=arr.dtype)
result[:1 - size] = (np.correlate(arr, block, 'valid') == 0)
return result
除了循环之外,运行ning 时间还取决于代码如何转换为硬件指令以及它们最终有多快。
例如,如果 NumPy 是在完全支持 SIMD 指令的情况下编译的,那么对于许多输入组合来说,矢量化(但不是理论上最优的)方法很可能会优于单纯加速的理论上最优的方法。
对于以加速技术可以很好地利用 SIMD 指令的方式编写的方法,也会发生同样的情况。
但是,它们提供的额外速度将仅限于特定系统(即编译器和处理器的特定组合)。
一个这样的例子如下(本质上是对 的改进,以避免全局变量并删除不必要的循环):
def find_blocks_simd_nb_cached(arr, size, value=0, cache={}):
assert size > 0
if size not in cache:
print('cached: ', list(cache.keys()))
def find_blocks_simd_nb(arr, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
check = (arr == value)
for i in range(n - size + 1):
# using a `temp` variable avoid unnecessarily accessing the `check` array
temp = check[i]
for j in range(1, size):
temp &= check[i + j]
result[i] = temp
return result
cache[size] = nb.njit(find_blocks_simd_nb)
return cache[size](arr, value)
基准测试
此处报告了一些基准测试。
一如既往,他们应该持保留态度。
对于不同的输入大小、块大小和值,只考虑最快的方法。
_nb
和 _cy
(find_blocks_for_cy()
除外本质上是相同函数的 non-optimized 版本,没有上面的 _nb
或 _cy
后缀。
不同的输入大小,块大小 = 4
1 和 0 的均匀分布
全0
全1
不同的块大小,输入大小 = 600_000
1 和 0 的均匀分布
全0
全1
注意:
find_blocks_for_all()
对于足够大的 size
值实际上很快
find_blocks_for_and()
对于较小的 size
值非常快并且不需要 Numba 或 Cython
find_blocks_for_nb()
基本上独立于块大小并且整体表现非常好
find_blocks_simd_nb()
对于 size
的小值非常快,但执行相对 porly 对于更大的值
find_blocks_while2_nb()
对于较大的 size
值确实很出色,对于较小的 值也相当快
(提供完整基准测试 here)。
这个只是 numpy,O(len(A)) 类似于 Ali_Sh 的答案,但更短更快。
它确实需要对数组的边缘进行一些笨拙的处理。
为 cumsum 选择较小的 dtype 是 micro-optimization 我不确定是否值得..
def numpy_linear(A, N):
dtype = np.min_scalar_type(N)
assert issubclass(dtype.type, np.unsignedinteger)
# The cumsum can overflow for longer arrays
# but the difference calculation will still be correct
c = np.cumsum(A == 0, dtype=dtype)
B = np.r_[c[N-1] == N, c[N:]-c[:-N] == N, np.zeros(N-1, bool)]
return B
您可以使用卷积过滤器来查看连续的零。由于末尾的“剩余”零不可能为True,如果没有它们的N长度序列,您总是可以用Falses完成结果。
import numpy as np
N = 3
A = np.array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
conv=np.convolve(np.ones((N)),A,mode='valid')
B=np.where(conv==0,True,False)
B=np.append(B,np.zeros((N-len(A)%N),dtype=bool))
结果是:
[False False False False True True True True False False]
应该只用按位数学就可以了。不确定速度,可能不会超过 numba
,但应该超过任何带有 for
循环的东西。
def find_blocks(A, N):
reduce = lambda x: np.logical_and(x[:-1], x[1:], out = x[:-1])
x = np.logical_not(A)
for i in range(N-1):
reduce(x)
x[-N+1:] = False
return x
find_blocks(A, 3)
Out[]:
array([False, False, False, False, True, True, True, True, False,
False])
对于 N
的小值(在运行时变化不大),最好的解决方案是使用 SIMD-friendly Numba 实现 专门用于每个可能的特定 N
值。当 N <= ~30
时,编译器可以生成非常高效的代码。对于更大的 N
值,@norok2 的解决方案开始变得更好,因为只要没有太多 0 项,它就可以跳过数组的许多项。当0的个数很大时,只要N <= 64
这个函数还是有竞争力的。当N > 64
时,请使用更适合的实现。
# Cache dictionary meant to store function for a given N
cache = {}
def compute(A, N):
assert N > 0
if N not in cache:
# Numba function to be compiled for a specific N
def compute_specific(A):
out = np.zeros(A.size, dtype=np.bool_)
isZero = (A == 0).view(np.uint8)
for i in range(isZero.size-N+1):
allSame = isZero[i]
for j in range(1, N):
allSame &= isZero[i+j]
out[i] = allSame
for i in range(A.size-N+1, A.size):
out[i] = 0
return out
cache[N] = nb.njit(compute_specific)
return cache[N](A)
编译器可以使用 SIMD 指令(如 AVX-2)auto-vectorize 此代码,以便在现代 x86 处理器上每个周期计算 32~64 项。较大的 N
值需要更多的 SIMD 指令,导致函数效率较低。相比之下,使用条件分支的实现往往很慢,因为 miss-predicted 分支在主流 x86 处理器上通常花费大约 10~15 个周期,并且每次迭代一次只能计算一个标量项(即~2 个数量级)假设工作量相同,速度会变慢)。
基准
这是在我的机器 (Intel Xeon Skylake) 上使用大小为 100_000 的随机(32 位)整数数组的结果。数组中大约有 50% 的 0 值。所有实现中都排除了 Numba 编译时间。
With N = 3:
- Initial solution: 115_000 µs
- Ali_Sh: 1_575 µs
- Naphat (stridetricks): 929 µs
- Naphat (correlational): 528 µs
- Michael Szczesny: 350 µs
- Norok2: 306 µs
- Franciska: 295 µs
- This code: 11 us <---
With N = 10:
- Initial solution: 149_000 µs
- Ali_Sh: 1_592 µs
- Naphat (stridetricks): 1_139 µs
- Naphat (correlational): 831 µs
- Michael Szczesny: 489 µs
- Franciska: 444 µs
- Norok2: 98 µs
- This code: 14 µs <---
With N = 30:
- Ali_Sh: [FAIL]
- Initial solution: 255_000 µs
- Franciska: 2_285 µs
- Naphat (correlational): 1_936 µs
- Naphat (stridetricks): 1_628 µs
- Michael Szczesny: 647 µs
- Norok2: 30 µs
- This code: 25 µs <---
With N = 60:
- Ali_Sh: [FAIL]
- Initial solution: 414_000 µs
- Naphat (correlational): 3_814 µs
- Franciska: 3_242 µs
- Naphat (stridetricks): 3_048 µs
- Michael Szczesny: 816 µs
- This code: 50 µs <---
- Norok2: 14 µs
我们可以清楚地看到这个函数是最快的(大幅度),除非 N 很大(在这种情况下,Norok2 的代码更快)。
请假定以下 NumPy 数组:
A = array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
我想找到这个数组中 N
个连续值等于零(含)的索引。
例如,假设 N=3
。我们知道 A[2]=0
而 A[3]>0
。这样,数组的第二个元素A
就不是连续三个零值(含)。数组 A 的理想结果如下所示:
B = array([False, False, False, False, True, True, True, True, False, False])
我可以写一个循环作为这个问题的答案:
N = 3
A = np.array([1, 1, 0, 0, 0, 0, 0, 0, 0, 0])
B = np.zeros(len(A), dtype=bool)
for i in range(len(A)):
if (i + N <= len(A)) and (sum(A[i:i + N]) == 0):
B[i] = True
由于矩阵 A 可能更大,而且我需要执行上述搜索过程数百万次,因此我需要最快的方法。你的建议是什么?
要获取自己的索引,可以从以下方法入手:
import bumpy as np
my__Truth = np.array([True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, False, True])
np.sort(my_Truth, in_place=my_Truth[0])
# ~ Obtain Index ~ #
index_values = np.where(my_Truth)
print(index_values)
# un-comment if using python2
# print index_values
我已经实施了一些您可以尝试的其他替代方案:
- 使用
numba
JIT 编译循环代码 - 使用
np.correlate
做一个signal-processing类型的相关,然后使用nonzero
获得指数 - 同上,但使用
stride_tricks
代替
我想对解决方案进行基准测试,但这里有点晚了。如果你愿意,我明天下班后可以做。
我也将 dtype=np.int8
用于 B
,因为更容易直观地确认输出是否正确(请参阅输出)。
import numpy as np
import numba
from numpy.lib.stride_tricks import as_strided
A = np.array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
N = 3
def baseline(A: np.ndarray):
'''
Your solution
'''
B = np.zeros(len(A), dtype=np.int8)
for i in range(len(A)):
if (i + N <= len(A)) and (sum(A[i:i + N]) == 0):
B[i] = 1
return B
@numba.njit()
def baseline_numba(A: np.ndarray, N: int):
'''
You may need to run this once first to see performance gains
as this has to be JIT compiled by Numba
'''
B = np.zeros(len(A), dtype=np.int8)
for i in range(len(A)):
if (i + N <= len(A)) and (A[i:i + N].sum() == 0):
B[i] = 1
return B
def correlational(A: np.ndarray):
'''
Use a correlation operation
'''
B = np.zeros_like(A, dtype=np.int8)
B[(np.correlate(A, np.full(N, 1), 'valid') == 0).nonzero()[0]] = 1
return B
def stridetricks(A: np.ndarray):
'''
Same idea as using correlation, but with stride tricks
Maybe it will be faster? Probably not.
'''
u = np.array(A.itemsize)
B = np.zeros_like(A, dtype=np.int8)
B[(as_strided(A, (len(A) - N + 1, N), u * (1, 1)).sum(1) == 0).nonzero()[0]] = 1
return B
print(baseline(A))
print(baseline_numba(A, N))
print(correlational(A))
print(stridetricks(A))
输出:
[0 0 0 0 1 1 1 1 0 0]
[0 0 0 0 1 1 1 1 0 0]
[0 0 0 0 1 1 1 1 0 0]
[0 0 0 0 1 1 1 1 0 0]
查找连续值和相关索引是其他 SO 问题中有解决方案的主题之一。在这方面我们可以发现range indices for repeated zeros and their numbers:
def zero_runs(a):
iszero = np.concatenate(([0], np.equal(a, 0).view(np.int8), [0]))
absdiff = np.abs(np.diff(iszero))
ranges = np.where(absdiff == 1)[0].reshape(-1, 2)
return ranges
# [[ 2 3]
# [ 4 10]]
当我们找到索引 运行ges 时,我们可以
def filled_array(start, end, length):
out = np.zeros((length), dtype=int)
np.add.at(out, start, 1)
np.add.at(out, end, -1)
return out.cumsum() > 0
def filled_array_v2(start, end, length):
out =np.bincount(start, minlength=length) - np.bincount(end, minlength=length)
return out.cumsum().astype(bool)
因此,通过使用上述函数并定义如下主函数:
def final(A, N):
runs = zero_runs(A)
repeats_cound = runs[:, 1] - runs[:, 0]
# [1 6]
suspected_ranges = runs[repeats_cound >= 3]
# [[ 4 10]]
start = suspected_ranges[:, 0]
end = start + N + 1
return filled_array(start, end, length=A.size) # filled_array_v2(start, end, length=A.size)
# [False False False False True True True True False False]
在我关于数据数组体积 10000
的一项基准测试中,此代码 运行 比提议的 numba 方法快
一些基准测试 (Colab)
通过使用 any
(而不是 sum
)作为 @Mercury 提到的循环 nb.prange
和 parallel=True
并行化它。
按A尺码:20000on Colab:
# N ----> 3:
1000 loops, best of 5: 105 µs per loop Norok
--------------------
100 loops, best of 5: 35 ms per loop Nephat baseline
100 loops, best of 5: 730 µs per loop Nephat baseline_numba
1000 loops, best of 5: 225 µs per loop Nephat correlational
1000 loops, best of 5: 397 µs per loop Nephat stridetricks
--------------------
1000 loops, best of 5: 42.3 µs per loop Michael Szczesny (The fastest)
--------------------
1000 loops, best of 5: 352 µs per loop This answer
1000 loops, best of 5: 279 µs per loop Nephat baseline_numba PARALLELIZED
=================================================================
# N ----> 10:
1000 loops, best of 5: 25.3 µs per loop Norok (The fastest)
--------------------
100 loops, best of 5: 46.8 ms per loop Nephat baseline
100 loops, best of 5: 794 µs per loop Nephat baseline_numba
1000 loops, best of 5: 318 µs per loop Nephat correlational
1000 loops, best of 5: 433 µs per loop Nephat stridetricks
--------------------
1000 loops, best of 5: 31.9 µs per loop Michael Szczesny
--------------------
1000 loops, best of 5: 341 µs per loop This answer
1000 loops, best of 5: 291 µs per loop Nephat baseline_numba PARALLELIZED
=================================================================
# N ----> 20
1000 loops, best of 5: 6.52 µs per loop Norok (The fastest)
--------------------
1000 loops, best of 5: 31.5 µs per loop Michael Szczesny
--------------------
1000 loops, best of 5: 350 µs per loop This answer
1000 loops, best of 5: 292 µs per loop Nephat baseline_numba PARALLELIZED
=================================================================
# N ----> 30
1000 loops, best of 5: 4.01 µs per loop Norok (The fastest)
--------------------
1000 loops, best of 5: 31.8 µs per loop Michael Szczesny
最新基准:
根据我对 the prepared colab link(最后两个单元格)的上次测试,以下结果分别显示了最佳性能:
- for
N <10
---> 理查德,(==
) 丹尼尔 - for
10 < N <20
---> Richard, Michael, (==
) Norok - for
20 < N <60
---> Norok, Richard, Michael - for
N >60
---> Norok, Michael, Richard
具有线性时间复杂度的解决方案,使用numba
编译。
想法是查看 A 的每个元素一次并计算计数器 c 中零的出现次数。如果遇到 non-zero 元素,c 将重置为 0。只要 c >= N
将结果数组设置为 True
在适当的 索引.
import numba as nb # tested with numba 0.55.1
@nb.jit
def numba_linear(A, N):
B = np.zeros(len(A), dtype=np.bool_)
c = 0
for i in range(len(A)):
c = c+1 if A[i] == 0 else 0
if c >= N: B[i - N + 1] = 1
return B
A = np.array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
numba_linear(A, 3)
输出
array([False, False, False, False, True, True, True, True, False,
False])
应用了针对@Nephat sum
在我的基准测试中比 any
快)。
import numpy as np
@nb.njit
def baseline_numba(A: np.ndarray, N: int):
'''
You may need to run this once first to see performance gains
as this has to be JIT compiled by Numba
'''
B = np.zeros(len(A), dtype=np.int8)
for i in nb.prange(len(A)-N+1):
if A[i:i + N].sum() == 0:
B[i] = 1
return B
rng = np.random.default_rng()
A = rng.integers(2, size=20000)
N = 3
%timeit baseline_numba(A, N)
#10000 loops, best of 5: 120 µs per loop
%timeit numba_linear(A, N)
#10000 loops, best of 5: 29.4 µs per loop
np.testing.assert_array_equal(numba_linear(A,N), baseline_numba(A,N))
TL;DR
只要块大小N
(以下简称m
或size
)足够大,并且输入有大量 non-zero 值:
import numpy as np
import numba as nb
@nb.njit
def find_blocks_while2_nb(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
i = j = 0
while i < n - size + 1:
# last iteration found a block, only checking one value
if j < i and arr[i + size - 1] == value:
result[i] = True
i += 1
else:
# search backward for index of last non-block value
j = i + size - 1
while j >= i and arr[j] == value:
j -= 1
if j < i:
# block found, advance by 1
result[i] = True
i += 1
else:
# block not found, advance by last non-block value occurrence
j += 1
i = j
return result
本质上,它是您原始方法的 Numba 加速版本,具有多个 micro-optimizations:
- 通过对主计数器设置适当的限制来减少主循环的执行次数
- 检查块是否包含所有相同值的部分:
- 避免相对昂贵的数组切片
- returns失败的位置
- 运行向后
- 避免re-checking个连续的块
对于小块大小,更多 micro-optimized 方法可能会更快,如其他答案中所述,并在下面简要讨论。
简介
找到相同值块开始处的索引的问题可以看作是计算机科学中经典问题的专门化:在数组中找到 sub-array(详细讨论 here).
在这种情况下,sub-string(即块)由一系列相同的值组成,更准确地说是 0
s。
理论
这个问题的复杂性通常取决于数组的大小 (n
) 和块的大小 (m
或 size
),以及作为它们中的实际值(显然)。
当然,假设 n > m
,因为很明显,如果 n == m
的解决方案是微不足道的,而当 n < m
时,数组无法容纳该块。
同样的道理,也找不到n - m
以上的索引。
虽然天真的实现基本上需要两个嵌套循环,一个用于字符串,一个用于 sub-string(这将导致 O(nm)
计算复杂性),但可以利用以下事实该块包含与 运行 算法在保证线性时间 O(n)
和恒定内存要求 O(1)
.
但是,假设输入数组具有块值和 non-block 值的混合分布,则可以在大约 运行 内完成。 O(n / (m - ε))
时间,仍然有 O(1)
内存消耗。 ε < m - 1
是一个与找到 non-block 值的概率成正比的参数。
这个想法是,当试图确定一个块是否从位置 i
开始时,如果数组在位置 j
和 j - i < m
处有一个 non-block 值,那么下一个要评估的位置是 j + 1
,因为该块不能放在 i
和 j
之间。
此外,如果搜索从后面开始(即在位置 i - m
),那么对于足够异构的输入,未检查的位置数量是最大的,因为不需要检查所有 j - i
值.
出于类似的原因,如果知道一个块从位置i
开始,要检查一个块是否从位置i + j
和j < m
开始,只有[之间的值=50=]和i + m + j
需要检查,i
和i + m - 1
之间的值不需要再次检查。
实施
这是一个手动案例,其中生成器非常适合,因为可以利用其惰性评估属性仅在需要时执行少量计算。 此外,事先不知道结果有多大(即可能找到很多块或根本找不到块)。
但是,如果这是更大计算的一部分,则 pre-allocate 结果的大小并仅修改其索引的值可能会更快。
不需要分配超过 n - m + 1
个值。
但是,为了与问题的要求兼容,输入大小的数组是pre-allocated。
如果需要,可以很容易地调整以下实现以避免浪费这些额外的内存。
更准确地说,基于显式循环的解决方案可以通过将 result[i] = True
替换为 yield i
(或类似的)来转换为生成器。
同样,本机计算布尔数组的解决方案可以转换为生成器,例如:
for i in np.nonzero(result)[0]:
yield i
分析的其余部分将假设目标结果是布尔数组。
在纯 Python 中实现这一点的挑战之一是显式循环相对较慢。 因此,使用一些加速技术(例如 Numba 或 Cython)来减少计算时间是很方便的。
或者,鉴于该算法主要由两个循环组成(一个用于数组,一个用于 sub-array),我们可能会寻求对一个或两个循环进行矢量化。
但是,请注意,这些方法必然会在 O(nm)
worst-case 时间内 运行。
基本的天真方法如下:
def find_blocks_for2(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
for i in range(n - size + 1):
all_equal = True
for j in range(i, i + size):
if arr[j] != value:
all_equal = False
break # vectorized approaches may not have this
if all_equal:
result[i] = True
return result
这可以在多个方面进行改进。
一种方法源于内循环实际上是无用的,因为它足以保持跟踪出现的连续“好”值的数量(这与
import numpy as np
def find_blocks_for(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
block_size = 0
for i in range(n):
if arr[i] == value:
block_size += 1
else:
block_size = 0
if block_size >= size:
result[i - size + 1] = True
return result
请注意,这可以看作是 Rabin-Karp algorithm 的一种特殊化,其中散列的作用由 block_size
计数器发挥。
另一种可能的方法是仅当最后一次迭代已经识别出块的存在时才跳过第二个循环,并使其向后 运行ning 以最大化跳转到下一个可能的块:
import numpy as np
def find_blocks_while2(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
i = j = 0
while i < n - size + 1:
# last iteration found a block, only checking one value
if j < i and arr[i + size - 1] == value:
result[i] = True
i += 1
else:
# search backward for index of last non-block value
j = i + size - 1
while j >= i and arr[j] == value:
j -= 1
if j < i:
# block found, advance by 1
result[i] = True
i += 1
else:
# block not found, advance by last non-block value occurrence
j += 1
i = j
return result
请注意,这可以看作是 Knuth-Morris-Pratt algorithm 的特化,不需要为块的每个值计算偏移量(它们都是相同的)。
可以为上述方法设计许多变体,所有变体都具有相同的渐近行为,但 运行ning 时间略有不同,具体取决于实际输入。 例如,可以将条件的顺序更改为 speed-up 找到许多连续块的情况,代价是 运行 在其他地方执行更多指令,或者相反。
所有这些 运行 在没有使用 Numba、Cython 或 Pypy 加速的情况下都太慢了。 Pypy 使用不同的解释器,我不会 discuss/benchmark 它进一步。
Numba 提供了一种最直接有效的加速方式(只需应用装饰器),例如:
import numba as nb
find_blocks_while2_nb = nb.njit(find_blocks_while2)
find_blocks_while2_nb.__name__ = "find_blocks_while2_nb"
也可以使用Cython,例如(使用 %load_ext Cython
Jupyter 魔法),但要充分利用编译,需要格外小心以确保瓶颈代码 运行s 没有 Python 交互:
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import cython as cy
import numpy as np
cdef _find_blocks_lin_cy(long[::1] arr, char[::1] result, Py_ssize_t n, Py_ssize_t size, long value):
cdef Py_ssize_t block_size = 0
for i in range(n):
if arr[i] == value:
block_size += 1
else:
block_size = 0
if block_size >= size:
result[i - size + 1] = True
def find_blocks_lin_cy(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
_find_blocks_lin_cy(arr, result, n, size, value)
return result
Numba 还是 Cython 会更快取决于代码可以触发多少优化,因此也取决于使用哪种编译器和 CPU 组合。
或者,可以对内部循环或外部循环(或两者)进行矢量化。 当然这个对内循环多还是外循环多运行最有效。 最大的循环应该 运行 向量化最大 speed-up.
矢量化内部循环会导致接近于此的结果(类似于 sum()
替换为 np.all()
,它也支持 short-circuiting):
def find_blocks_for_all(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
for i in range(n - size + 1):
if np.all(arr[i:i + size] == value):
result[i] = True
return result
向量化外循环会导致类似的结果(类似于
def find_blocks_for_and(arr, size, value):
result = (arr == value)
for i in range(1, size):
result[:-1] &= result[1:]
result[1 - size:] = False
return result
向量化两个循环将导致接近于此的结果(类似于
def find_blocks_strides(arr, size, value):
n = arr.size
strides = (arr.itemsize,) * 2
block = np.full(size, value, dtype=arr.dtype)
# note that `as_strided()` does not allocate extra memory
# the downside is that buffer overflow will occur and
# extra care must be taken not to use data outside the boundaries
check = np.lib.stride_tricks.as_strided(arr, (n, size), strides)
result = np.all(check == block, axis=-1)
result[1 - size:] = False
return result
假设输入仅包含正值且该块仅包含零,则上述代码可以进一步专门化。
这意味着将 np.all()
调用替换为 np.sum()
或类似的调用。
这实际上是有益的一种方法是使用完全矢量化方法,其中 as_strided()
可以通过计算与 non-zero 块的相关性来代替(类似于
def find_blocks_0_conv(arr, size):
n = arr.size
result = np.zeros_like(arr, dtype=np.bool_)
block = np.ones(size, dtype=arr.dtype)
result[:1 - size] = (np.correlate(arr, block, 'valid') == 0)
return result
除了循环之外,运行ning 时间还取决于代码如何转换为硬件指令以及它们最终有多快。 例如,如果 NumPy 是在完全支持 SIMD 指令的情况下编译的,那么对于许多输入组合来说,矢量化(但不是理论上最优的)方法很可能会优于单纯加速的理论上最优的方法。 对于以加速技术可以很好地利用 SIMD 指令的方式编写的方法,也会发生同样的情况。 但是,它们提供的额外速度将仅限于特定系统(即编译器和处理器的特定组合)。
一个这样的例子如下(本质上是对
def find_blocks_simd_nb_cached(arr, size, value=0, cache={}):
assert size > 0
if size not in cache:
print('cached: ', list(cache.keys()))
def find_blocks_simd_nb(arr, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
check = (arr == value)
for i in range(n - size + 1):
# using a `temp` variable avoid unnecessarily accessing the `check` array
temp = check[i]
for j in range(1, size):
temp &= check[i + j]
result[i] = temp
return result
cache[size] = nb.njit(find_blocks_simd_nb)
return cache[size](arr, value)
基准测试
此处报告了一些基准测试。 一如既往,他们应该持保留态度。
对于不同的输入大小、块大小和值,只考虑最快的方法。
_nb
和 _cy
(find_blocks_for_cy()
除外本质上是相同函数的 non-optimized 版本,没有上面的 _nb
或 _cy
后缀。
不同的输入大小,块大小 = 4
1 和 0 的均匀分布
全0
全1
不同的块大小,输入大小 = 600_000
1 和 0 的均匀分布
全0
全1
注意:
find_blocks_for_all()
对于足够大的size
值实际上很快
find_blocks_for_and()
对于较小的size
值非常快并且不需要 Numba 或 Cythonfind_blocks_for_nb()
基本上独立于块大小并且整体表现非常好find_blocks_simd_nb()
对于size
的小值非常快,但执行相对 porly 对于更大的值find_blocks_while2_nb()
对于较大的size
值确实很出色,对于较小的 值也相当快
(提供完整基准测试 here)。
这个只是 numpy,O(len(A)) 类似于 Ali_Sh 的答案,但更短更快。 它确实需要对数组的边缘进行一些笨拙的处理。 为 cumsum 选择较小的 dtype 是 micro-optimization 我不确定是否值得..
def numpy_linear(A, N):
dtype = np.min_scalar_type(N)
assert issubclass(dtype.type, np.unsignedinteger)
# The cumsum can overflow for longer arrays
# but the difference calculation will still be correct
c = np.cumsum(A == 0, dtype=dtype)
B = np.r_[c[N-1] == N, c[N:]-c[:-N] == N, np.zeros(N-1, bool)]
return B
您可以使用卷积过滤器来查看连续的零。由于末尾的“剩余”零不可能为True,如果没有它们的N长度序列,您总是可以用Falses完成结果。
import numpy as np
N = 3
A = np.array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
conv=np.convolve(np.ones((N)),A,mode='valid')
B=np.where(conv==0,True,False)
B=np.append(B,np.zeros((N-len(A)%N),dtype=bool))
结果是:
[False False False False True True True True False False]
应该只用按位数学就可以了。不确定速度,可能不会超过 numba
,但应该超过任何带有 for
循环的东西。
def find_blocks(A, N):
reduce = lambda x: np.logical_and(x[:-1], x[1:], out = x[:-1])
x = np.logical_not(A)
for i in range(N-1):
reduce(x)
x[-N+1:] = False
return x
find_blocks(A, 3)
Out[]:
array([False, False, False, False, True, True, True, True, False,
False])
对于 N
的小值(在运行时变化不大),最好的解决方案是使用 SIMD-friendly Numba 实现 专门用于每个可能的特定 N
值。当 N <= ~30
时,编译器可以生成非常高效的代码。对于更大的 N
值,@norok2 的解决方案开始变得更好,因为只要没有太多 0 项,它就可以跳过数组的许多项。当0的个数很大时,只要N <= 64
这个函数还是有竞争力的。当N > 64
时,请使用更适合的实现。
# Cache dictionary meant to store function for a given N
cache = {}
def compute(A, N):
assert N > 0
if N not in cache:
# Numba function to be compiled for a specific N
def compute_specific(A):
out = np.zeros(A.size, dtype=np.bool_)
isZero = (A == 0).view(np.uint8)
for i in range(isZero.size-N+1):
allSame = isZero[i]
for j in range(1, N):
allSame &= isZero[i+j]
out[i] = allSame
for i in range(A.size-N+1, A.size):
out[i] = 0
return out
cache[N] = nb.njit(compute_specific)
return cache[N](A)
编译器可以使用 SIMD 指令(如 AVX-2)auto-vectorize 此代码,以便在现代 x86 处理器上每个周期计算 32~64 项。较大的 N
值需要更多的 SIMD 指令,导致函数效率较低。相比之下,使用条件分支的实现往往很慢,因为 miss-predicted 分支在主流 x86 处理器上通常花费大约 10~15 个周期,并且每次迭代一次只能计算一个标量项(即~2 个数量级)假设工作量相同,速度会变慢)。
基准
这是在我的机器 (Intel Xeon Skylake) 上使用大小为 100_000 的随机(32 位)整数数组的结果。数组中大约有 50% 的 0 值。所有实现中都排除了 Numba 编译时间。
With N = 3:
- Initial solution: 115_000 µs
- Ali_Sh: 1_575 µs
- Naphat (stridetricks): 929 µs
- Naphat (correlational): 528 µs
- Michael Szczesny: 350 µs
- Norok2: 306 µs
- Franciska: 295 µs
- This code: 11 us <---
With N = 10:
- Initial solution: 149_000 µs
- Ali_Sh: 1_592 µs
- Naphat (stridetricks): 1_139 µs
- Naphat (correlational): 831 µs
- Michael Szczesny: 489 µs
- Franciska: 444 µs
- Norok2: 98 µs
- This code: 14 µs <---
With N = 30:
- Ali_Sh: [FAIL]
- Initial solution: 255_000 µs
- Franciska: 2_285 µs
- Naphat (correlational): 1_936 µs
- Naphat (stridetricks): 1_628 µs
- Michael Szczesny: 647 µs
- Norok2: 30 µs
- This code: 25 µs <---
With N = 60:
- Ali_Sh: [FAIL]
- Initial solution: 414_000 µs
- Naphat (correlational): 3_814 µs
- Franciska: 3_242 µs
- Naphat (stridetricks): 3_048 µs
- Michael Szczesny: 816 µs
- This code: 50 µs <---
- Norok2: 14 µs
我们可以清楚地看到这个函数是最快的(大幅度),除非 N 很大(在这种情况下,Norok2 的代码更快)。