将 numpy 掩码有效地扩展到每个错误值的右侧 n 个单元格
Extend numpy mask by n cells to the right for each bad value, efficiently
假设我有一个长度为 30 的数组,其中有 4 个错误值。我想为那些坏值创建一个掩码,但由于我将使用滚动 window 函数,我还希望在每个坏值之后有固定数量的后续索引被标记为坏。在下面,n = 3:
我想尽可能高效地执行此操作,因为此例程将在包含数十亿个数据点的大型数据系列上 运行 多次。因此,我需要尽可能接近 numpy 向量化解决方案,因为我想避免 python 循环。
为避免重新输入,这里是数组:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
是这样的吗?
maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright):
mask[l:r] = True
或者,由于 n 很小,因此循环遍历它可能会更好:
maskleft = np.where(np.isnan(a))[0]
mask = np.zeros(len(a),dtype=bool)
for i in range(0,n):
thismask = maskleft+i
mask[thismask] = True
除了对 n 的循环外,上面的内容是完全向量化的。但是该循环是完全可并行化的,因此您可以使用例如获得 n 倍加速。 multiprocessing 或 Cython,如果你愿意麻烦的话。
编辑:根据@askewchan 解决方案 2 可能会导致超出范围的错误。它在 range(0,n)
中也有索引问题。可能的更正:
maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+n,dtype=bool)
for i in range(0, n+1):
thismask = maskleft + i
ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-n]
您可以使用 np.ufunc.reduceat
with np.bitwise_or
:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
m = np.isnan(a)
n = 4
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)
np.bitwise_or.reduceat(m, ind.ravel())[::2]
关于您的数据:
print np.column_stack([m, reduced])
[[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[ True True]
[False True]
[False True]
[False True]
[False False]
[ True True]
[ True True]
[False True]
[False True]
[False True]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[ True True]
[False True]]
这也可以被认为是 morphological dilation problem, using here the scipy.ndimage.binary_dilation
:
def dilation(a, n):
m = np.isnan(a)
s = np.full(n, True, bool)
return ndimage.binary_dilation(m, structure=s, origin=-(n//2))
关于 origin
的注意事项:此参数确保 structure
(我将其称为 内核 )从 structure
的左侧开始一点=16=](你的面具m
)。通常 out[i]
处的值是 structure
中心(即 structure[n//2]
)在 in[i]
处的膨胀,但您希望 structure[0]
是在 in[i]
.
您也可以使用在左侧填充 False
s 的内核执行此操作,如果您使用 binary_dilation
from scikit-image:
则需要这样做
def dilation_skimage(a, n):
m = np.isnan(a)
s = np.zeros(2*n - n%2, bool)
s[-n:] = True
return skimage.morphology.binary_dilation(m, selem=s)
两者之间的时间似乎没有太大变化:
dilation_scipy
small: 10 loops, best of 3: 47.9 ms per loop
large: 10000 loops, best of 3: 88.9 µs per loop
dilation_skimage
small: 10 loops, best of 3: 47.0 ms per loop
large: 10000 loops, best of 3: 91.1 µs per loop
您可以使用与 运行 平均过滤器相同的 cumsum 技巧:
def cumsum_trick(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask)
cs[n:] -= cs[:-n].copy()
return cs > 0
不幸的是,需要额外的 .copy()
,因为 一些内部缓冲 操作顺序。可以说服 numpy 反向应用减法,但要使其工作,cs
数组必须具有负步幅:
def cumsum_trick_nocopy(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask, out=np.empty_like(a, int)[::-1])
cs[n:] -= cs[:-n]
out = cs > 0
return out
但这看起来很脆弱,而且实际上并没有那么快。
我想知道在某处是否有编译信号处理函数可以执行此确切操作..
对于稀疏的初始掩码和小 n
这个也很快:
def index_expansion(a, n):
mask = np.isnan(a)
idx = np.flatnonzero(mask)
expanded_idx = idx[:,None] + np.arange(1, n)
np.put(mask, expanded_idx, True, 'clip')
return mask
又一个答案!
它只是采用您已经拥有的掩码并将逻辑或应用于自身的移位版本。很好地矢量化并且非常快! :D
def repeat_or(a, n=4):
m = np.isnan(a)
k = m.copy()
# lenM and lenK say for each mask how many
# subsequent Trues there are at least
lenM, lenK = 1, 1
# we run until a combination of both masks will give us n or more
# subsequent Trues
while lenM+lenK < n:
# append what we have in k to the end of what we have in m
m[lenM:] |= k[:-lenM]
# swap so that m is again the small one
m, k = k, m
# update the lengths
lenM, lenK = lenK, lenM+lenK
# see how much m has to be shifted in order to append the missing Trues
k[n-lenM:] |= m[:-n+lenM]
return k
不幸的是,我无法得到 m[i:] |= m[:-i]
运行... 修改和使用掩码修改自身可能不是一个好主意。它确实适用于 m[:-i] |= m[i:]
,但这是错误的方向。
无论如何,我们现在有类似斐波那契的增长,而不是二次增长,这仍然比线性增长要好。
(我从没想过我真的会写一个与斐波那契数列真正相关的算法,而不是一些奇怪的数学问题。)
在 "real" 条件下使用大小为 1e6 和 1e5 NAN 的数组进行测试:
In [5]: a = np.random.random(size=1e6)
In [6]: a[np.random.choice(np.arange(len(a), dtype=int), 1e5, replace=False)] = np.nan
In [7]: %timeit reduceat(a)
10 loops, best of 3: 65.2 ms per loop
In [8]: %timeit index_expansion(a)
100 loops, best of 3: 12 ms per loop
In [9]: %timeit cumsum_trick(a)
10 loops, best of 3: 17 ms per loop
In [10]: %timeit repeat_or(a)
1000 loops, best of 3: 1.9 ms per loop
In [11]: %timeit agml_indexing(a)
100 loops, best of 3: 6.91 ms per loop
我会将进一步的基准留给 Thomas。
OP 此处包含基准测试结果。我已经包含了我自己的 ("op"),它是我开始使用的,它循环遍历坏索引并向它们添加 1...n,然后使用唯一值来查找掩码索引。您可以在下面的代码中看到它以及所有其他响应。
不管怎样,这里是结果。方面是数组沿 x 的大小(10 到 10e7)和 window 沿 y(5、50、500、5000)的大小。然后是每个方面的编码器,得分为 log-10,因为我们谈论的是微秒到分钟。
@swenzel 似乎是他的第二个答案的赢家,取代了@moarningsun 的第一个答案(moarningsun 的第二个答案是通过大量内存使用使机器崩溃,但这可能是因为它不是为大 n 或非稀疏设计的一种)。
由于(必要的)对数刻度,该图表没有公正地显示这些贡献中最快的一个。它们甚至比体面的循环解决方案快几十倍、几百倍。在最大的情况下,swenzel1 比 op 快 1000 倍,并且 op 已经在使用 numpy。
请注意,我使用了针对优化的英特尔 MKL 库编译的 numpy 版本,该库充分利用了自 2012 年以来出现的 AVX 指令。在某些矢量用例中,这会将 i7/Xeon 速度提高系数 5。某些贡献可能比其他贡献受益更多。
这是 运行 迄今为止提交的所有答案的完整代码,包括我自己的答案。函数 allagree() 确保结果是正确的,而 timeall() 会给你一个长格式的 pandas 数据框,所有结果都在几秒钟内完成。
您可以很容易地用新代码重新运行它,或者改变我的假设。请记住,我没有考虑内存使用等其他因素。另外,我使用 R ggplot2 来制作图形,因为我对 seaborn/matplotlib 的了解还不足以让它做我想做的事。
为完整起见,所有结果一致:
In [4]: allagree(n = 7, asize = 777)
Out[4]:
AGML0 AGML1 askewchan0 askewchan1 askewchan2 moarningsun0 \
AGML0 True True True True True True
AGML1 True True True True True True
askewchan0 True True True True True True
askewchan1 True True True True True True
askewchan2 True True True True True True
moarningsun0 True True True True True True
swenzel0 True True True True True True
swenzel1 True True True True True True
op True True True True True True
swenzel0 swenzel1 op
AGML0 True True True
AGML1 True True True
askewchan0 True True True
askewchan1 True True True
askewchan2 True True True
moarningsun0 True True True
swenzel0 True True True
swenzel1 True True True
op True True True
感谢所有投稿者!
使用 R 中的 pd.to_csv 和 read.csv 导出 timeall() 输出后的图形代码:
ww <- read.csv("ww.csv")
ggplot(ww, aes(x=coder, y=value, col = coder)) + geom_point(size = 3) + scale_y_continuous(trans="log10")+ facet_grid(nsize ~ asize) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + ggtitle("Fastest by coder") + ylab("time (seconds)")
测试代码:
# test Stack Overflow 32706135 nan shift routines
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from timeit import Timer
from scipy import ndimage
from skimage import morphology
import itertools
import pdb
np.random.seed(8472)
def AGML0(a, n): # loop itertools
maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright):
mask[l:r] = True
return mask
def AGML1(a, n): # loop n
nn = n - 1
maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+nn,dtype=bool)
for i in range(0, nn+1):
thismask = maskleft + i
ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-nn]
return mask
def askewchan0(a, n):
m = np.isnan(a)
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)
return np.bitwise_or.reduceat(m, ind.ravel())[::2]
def askewchan1(a, n):
m = np.isnan(a)
s = np.full(n, True, bool)
return ndimage.binary_dilation(m, structure=s, origin=-(n//2))
def askewchan2(a, n):
m = np.isnan(a)
s = np.zeros(2*n - n%2, bool)
s[-n:] = True
return morphology.binary_dilation(m, selem=s)
def moarningsun0(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask)
cs[n:] -= cs[:-n].copy()
return cs > 0
def moarningsun1(a, n):
mask = np.isnan(a)
idx = np.flatnonzero(mask)
expanded_idx = idx[:,None] + np.arange(1, n)
np.put(mask, expanded_idx, True, 'clip')
return mask
def swenzel0(a, n):
m = np.isnan(a)
k = m.copy()
for i in range(1, n):
k[i:] |= m[:-i]
return k
def swenzel1(a, n=4):
m = np.isnan(a)
k = m.copy()
# lenM and lenK say for each mask how many
# subsequent Trues there are at least
lenM, lenK = 1, 1
# we run until a combination of both masks will give us n or more
# subsequent Trues
while lenM+lenK < n:
# append what we have in k to the end of what we have in m
m[lenM:] |= k[:-lenM]
# swap so that m is again the small one
m, k = k, m
# update the lengths
lenM, lenK = lenK, lenM+lenK
# see how much m has to be shifted in order to append the missing Trues
k[n-lenM:] |= m[:-n+lenM]
return k
def op(a, n):
m = np.isnan(a)
for x in range(1, n):
m = np.logical_or(m, np.r_[False, m][:-1])
return m
# all the functions in a list. NB these are the actual functions, not their names
funcs = [AGML0, AGML1, askewchan0, askewchan1, askewchan2, moarningsun0, swenzel0, swenzel1, op]
def allagree(fns = funcs, n = 10, asize = 100):
""" make sure result is the same from all functions """
fnames = [f.__name__ for f in fns]
a = np.random.rand(asize)
a[np.random.randint(0, asize, int(asize / 10))] = np.nan
results = dict([(f.__name__, f(a, n)) for f in fns])
isgood = [[np.array_equal(results[f1], results[f2]) for f1 in fnames] for f2 in fnames]
pdgood = pd.DataFrame(isgood, columns = fnames, index = fnames)
if not all([all(x) for x in isgood]):
print "not all results identical"
pdb.set_trace()
return pdgood
def timeone(f):
""" time one of the functions across the full range of a nd n """
print "Timing", f.__name__
Ns = np.array([10**x for x in range(0, 4)]) * 5 # 5 to 5000 window size
As = [np.random.rand(10 ** x) for x in range(1, 8)] # up to 10 million data data points
for i in range(len(As)): # 10% of points are always bad
As[i][np.random.randint(0, len(As[i]), len(As[i]) / 10)] = np.nan
results = np.array([[Timer(lambda: f(a, n)).timeit(number = 1) if n < len(a) \
else np.nan for n in Ns] for a in As])
pdresults = pd.DataFrame(results, index = [len(x) for x in As], columns = Ns)
return pdresults
def timeall(fns = funcs):
""" run timeone for all known funcs """
testd = dict([(x.__name__, timeone(x)) for x in fns])
testdf = pd.concat(testd.values(), axis = 0, keys = testd.keys())
testdf.index.names = ["coder", "asize"]
testdf.columns.names = ["nsize"]
testdf.reset_index(inplace = True)
testdf = pd.melt(testdf, id_vars = ["coder", "asize"])
return testdf
晚了几年,但我想出了一个完全矢量化的解决方案,不需要循环或副本(除了掩码本身)。这个解决方案有点(潜在)危险,因为它使用 numpy.lib.stride_tricks.as_strided
. It's also not as fast as .
我们的想法是采用遮罩并创建它的二维视图,其中第二维只是当前元素之后的元素。然后,如果头部是 True
,则可以将整列设置为 True
。由于您正在处理视图,因此设置列实际上会在掩码中设置以下元素。
从数据开始:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
n = 3
现在,我们将使掩码 a.size + n
个元素变长,这样您就不必手动处理最后 n
个元素:
mask = np.empty(a.size + n, dtype=np.bool)
np.isnan(a, out=mask[:a.size])
mask[a.size:] = False
现在最酷的部分是:
view = np.lib.stride_tricks.as_strided(mask, shape=(n + 1, a.size),
strides=mask.strides * 2)
最后一部分很关键。 mask.strides
是一个类似于 (1,)
的元组(因为布尔值通常大约有那么多字节。加倍意味着您需要一个 1 字节的步骤来移动 any[=57 中的一个元素=]维度。
现在您需要做的就是展开遮罩:
view[1:, view[0]] = True
就是这样。现在 mask
有你想要的。请记住,这仅适用于分配索引在最后更改的值之前。你无法逃脱 view[1:] |= view[0]
.
出于基准测试目的,n
的定义似乎已从问题中更改,因此以下函数将其考虑在内:
def madphysicist0(a, n):
m = np.empty(a.size + n - 1, dtype=np.bool)
np.isnan(a, out=m[:a.size])
m[a.size:] = False
v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)
v[1:, v[0]] = True
return v[0]
V2
借鉴现有最快的答案,我们只需要复制 log<sub>2</sub>(n)
行,而不是 n
行:
def madphysicist1(a, n):
m = np.empty(a.size + n - 1, dtype=np.bool)
np.isnan(a, out=m[:a.size])
m[a.size:] = False
v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)
stop = int(np.log2(n))
for k in range(1, stop + 1):
v[k, v[0]] = True
if (1<<k) < n:
v[-1, v[(1<<k) - 1]] = True
return v[0]
由于每次迭代都会将掩码的大小加倍,因此它比 Fibonacci 快一点:https://math.stackexchange.com/q/894743/295281
假设我有一个长度为 30 的数组,其中有 4 个错误值。我想为那些坏值创建一个掩码,但由于我将使用滚动 window 函数,我还希望在每个坏值之后有固定数量的后续索引被标记为坏。在下面,n = 3:
我想尽可能高效地执行此操作,因为此例程将在包含数十亿个数据点的大型数据系列上 运行 多次。因此,我需要尽可能接近 numpy 向量化解决方案,因为我想避免 python 循环。
为避免重新输入,这里是数组:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
是这样的吗?
maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright):
mask[l:r] = True
或者,由于 n 很小,因此循环遍历它可能会更好:
maskleft = np.where(np.isnan(a))[0]
mask = np.zeros(len(a),dtype=bool)
for i in range(0,n):
thismask = maskleft+i
mask[thismask] = True
除了对 n 的循环外,上面的内容是完全向量化的。但是该循环是完全可并行化的,因此您可以使用例如获得 n 倍加速。 multiprocessing 或 Cython,如果你愿意麻烦的话。
编辑:根据@askewchan 解决方案 2 可能会导致超出范围的错误。它在 range(0,n)
中也有索引问题。可能的更正:
maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+n,dtype=bool)
for i in range(0, n+1):
thismask = maskleft + i
ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-n]
您可以使用 np.ufunc.reduceat
with np.bitwise_or
:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
m = np.isnan(a)
n = 4
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)
np.bitwise_or.reduceat(m, ind.ravel())[::2]
关于您的数据:
print np.column_stack([m, reduced])
[[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[ True True]
[False True]
[False True]
[False True]
[False False]
[ True True]
[ True True]
[False True]
[False True]
[False True]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[ True True]
[False True]]
这也可以被认为是 morphological dilation problem, using here the scipy.ndimage.binary_dilation
:
def dilation(a, n):
m = np.isnan(a)
s = np.full(n, True, bool)
return ndimage.binary_dilation(m, structure=s, origin=-(n//2))
关于 origin
的注意事项:此参数确保 structure
(我将其称为 内核 )从 structure
的左侧开始一点=16=](你的面具m
)。通常 out[i]
处的值是 structure
中心(即 structure[n//2]
)在 in[i]
处的膨胀,但您希望 structure[0]
是在 in[i]
.
您也可以使用在左侧填充 False
s 的内核执行此操作,如果您使用 binary_dilation
from scikit-image:
def dilation_skimage(a, n):
m = np.isnan(a)
s = np.zeros(2*n - n%2, bool)
s[-n:] = True
return skimage.morphology.binary_dilation(m, selem=s)
两者之间的时间似乎没有太大变化:
dilation_scipy
small: 10 loops, best of 3: 47.9 ms per loop
large: 10000 loops, best of 3: 88.9 µs per loop
dilation_skimage
small: 10 loops, best of 3: 47.0 ms per loop
large: 10000 loops, best of 3: 91.1 µs per loop
您可以使用与 运行 平均过滤器相同的 cumsum 技巧:
def cumsum_trick(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask)
cs[n:] -= cs[:-n].copy()
return cs > 0
不幸的是,需要额外的 .copy()
,因为 一些内部缓冲 操作顺序。可以说服 numpy 反向应用减法,但要使其工作,cs
数组必须具有负步幅:
def cumsum_trick_nocopy(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask, out=np.empty_like(a, int)[::-1])
cs[n:] -= cs[:-n]
out = cs > 0
return out
但这看起来很脆弱,而且实际上并没有那么快。
我想知道在某处是否有编译信号处理函数可以执行此确切操作..
对于稀疏的初始掩码和小 n
这个也很快:
def index_expansion(a, n):
mask = np.isnan(a)
idx = np.flatnonzero(mask)
expanded_idx = idx[:,None] + np.arange(1, n)
np.put(mask, expanded_idx, True, 'clip')
return mask
又一个答案!
它只是采用您已经拥有的掩码并将逻辑或应用于自身的移位版本。很好地矢量化并且非常快! :D
def repeat_or(a, n=4):
m = np.isnan(a)
k = m.copy()
# lenM and lenK say for each mask how many
# subsequent Trues there are at least
lenM, lenK = 1, 1
# we run until a combination of both masks will give us n or more
# subsequent Trues
while lenM+lenK < n:
# append what we have in k to the end of what we have in m
m[lenM:] |= k[:-lenM]
# swap so that m is again the small one
m, k = k, m
# update the lengths
lenM, lenK = lenK, lenM+lenK
# see how much m has to be shifted in order to append the missing Trues
k[n-lenM:] |= m[:-n+lenM]
return k
不幸的是,我无法得到 m[i:] |= m[:-i]
运行... 修改和使用掩码修改自身可能不是一个好主意。它确实适用于 m[:-i] |= m[i:]
,但这是错误的方向。
无论如何,我们现在有类似斐波那契的增长,而不是二次增长,这仍然比线性增长要好。
(我从没想过我真的会写一个与斐波那契数列真正相关的算法,而不是一些奇怪的数学问题。)
在 "real" 条件下使用大小为 1e6 和 1e5 NAN 的数组进行测试:
In [5]: a = np.random.random(size=1e6)
In [6]: a[np.random.choice(np.arange(len(a), dtype=int), 1e5, replace=False)] = np.nan
In [7]: %timeit reduceat(a)
10 loops, best of 3: 65.2 ms per loop
In [8]: %timeit index_expansion(a)
100 loops, best of 3: 12 ms per loop
In [9]: %timeit cumsum_trick(a)
10 loops, best of 3: 17 ms per loop
In [10]: %timeit repeat_or(a)
1000 loops, best of 3: 1.9 ms per loop
In [11]: %timeit agml_indexing(a)
100 loops, best of 3: 6.91 ms per loop
我会将进一步的基准留给 Thomas。
OP 此处包含基准测试结果。我已经包含了我自己的 ("op"),它是我开始使用的,它循环遍历坏索引并向它们添加 1...n,然后使用唯一值来查找掩码索引。您可以在下面的代码中看到它以及所有其他响应。
不管怎样,这里是结果。方面是数组沿 x 的大小(10 到 10e7)和 window 沿 y(5、50、500、5000)的大小。然后是每个方面的编码器,得分为 log-10,因为我们谈论的是微秒到分钟。
@swenzel 似乎是他的第二个答案的赢家,取代了@moarningsun 的第一个答案(moarningsun 的第二个答案是通过大量内存使用使机器崩溃,但这可能是因为它不是为大 n 或非稀疏设计的一种)。
由于(必要的)对数刻度,该图表没有公正地显示这些贡献中最快的一个。它们甚至比体面的循环解决方案快几十倍、几百倍。在最大的情况下,swenzel1 比 op 快 1000 倍,并且 op 已经在使用 numpy。
请注意,我使用了针对优化的英特尔 MKL 库编译的 numpy 版本,该库充分利用了自 2012 年以来出现的 AVX 指令。在某些矢量用例中,这会将 i7/Xeon 速度提高系数 5。某些贡献可能比其他贡献受益更多。
这是 运行 迄今为止提交的所有答案的完整代码,包括我自己的答案。函数 allagree() 确保结果是正确的,而 timeall() 会给你一个长格式的 pandas 数据框,所有结果都在几秒钟内完成。
您可以很容易地用新代码重新运行它,或者改变我的假设。请记住,我没有考虑内存使用等其他因素。另外,我使用 R ggplot2 来制作图形,因为我对 seaborn/matplotlib 的了解还不足以让它做我想做的事。
为完整起见,所有结果一致:
In [4]: allagree(n = 7, asize = 777)
Out[4]:
AGML0 AGML1 askewchan0 askewchan1 askewchan2 moarningsun0 \
AGML0 True True True True True True
AGML1 True True True True True True
askewchan0 True True True True True True
askewchan1 True True True True True True
askewchan2 True True True True True True
moarningsun0 True True True True True True
swenzel0 True True True True True True
swenzel1 True True True True True True
op True True True True True True
swenzel0 swenzel1 op
AGML0 True True True
AGML1 True True True
askewchan0 True True True
askewchan1 True True True
askewchan2 True True True
moarningsun0 True True True
swenzel0 True True True
swenzel1 True True True
op True True True
感谢所有投稿者!
使用 R 中的 pd.to_csv 和 read.csv 导出 timeall() 输出后的图形代码:
ww <- read.csv("ww.csv")
ggplot(ww, aes(x=coder, y=value, col = coder)) + geom_point(size = 3) + scale_y_continuous(trans="log10")+ facet_grid(nsize ~ asize) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + ggtitle("Fastest by coder") + ylab("time (seconds)")
测试代码:
# test Stack Overflow 32706135 nan shift routines
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from timeit import Timer
from scipy import ndimage
from skimage import morphology
import itertools
import pdb
np.random.seed(8472)
def AGML0(a, n): # loop itertools
maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright):
mask[l:r] = True
return mask
def AGML1(a, n): # loop n
nn = n - 1
maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+nn,dtype=bool)
for i in range(0, nn+1):
thismask = maskleft + i
ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-nn]
return mask
def askewchan0(a, n):
m = np.isnan(a)
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)
return np.bitwise_or.reduceat(m, ind.ravel())[::2]
def askewchan1(a, n):
m = np.isnan(a)
s = np.full(n, True, bool)
return ndimage.binary_dilation(m, structure=s, origin=-(n//2))
def askewchan2(a, n):
m = np.isnan(a)
s = np.zeros(2*n - n%2, bool)
s[-n:] = True
return morphology.binary_dilation(m, selem=s)
def moarningsun0(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask)
cs[n:] -= cs[:-n].copy()
return cs > 0
def moarningsun1(a, n):
mask = np.isnan(a)
idx = np.flatnonzero(mask)
expanded_idx = idx[:,None] + np.arange(1, n)
np.put(mask, expanded_idx, True, 'clip')
return mask
def swenzel0(a, n):
m = np.isnan(a)
k = m.copy()
for i in range(1, n):
k[i:] |= m[:-i]
return k
def swenzel1(a, n=4):
m = np.isnan(a)
k = m.copy()
# lenM and lenK say for each mask how many
# subsequent Trues there are at least
lenM, lenK = 1, 1
# we run until a combination of both masks will give us n or more
# subsequent Trues
while lenM+lenK < n:
# append what we have in k to the end of what we have in m
m[lenM:] |= k[:-lenM]
# swap so that m is again the small one
m, k = k, m
# update the lengths
lenM, lenK = lenK, lenM+lenK
# see how much m has to be shifted in order to append the missing Trues
k[n-lenM:] |= m[:-n+lenM]
return k
def op(a, n):
m = np.isnan(a)
for x in range(1, n):
m = np.logical_or(m, np.r_[False, m][:-1])
return m
# all the functions in a list. NB these are the actual functions, not their names
funcs = [AGML0, AGML1, askewchan0, askewchan1, askewchan2, moarningsun0, swenzel0, swenzel1, op]
def allagree(fns = funcs, n = 10, asize = 100):
""" make sure result is the same from all functions """
fnames = [f.__name__ for f in fns]
a = np.random.rand(asize)
a[np.random.randint(0, asize, int(asize / 10))] = np.nan
results = dict([(f.__name__, f(a, n)) for f in fns])
isgood = [[np.array_equal(results[f1], results[f2]) for f1 in fnames] for f2 in fnames]
pdgood = pd.DataFrame(isgood, columns = fnames, index = fnames)
if not all([all(x) for x in isgood]):
print "not all results identical"
pdb.set_trace()
return pdgood
def timeone(f):
""" time one of the functions across the full range of a nd n """
print "Timing", f.__name__
Ns = np.array([10**x for x in range(0, 4)]) * 5 # 5 to 5000 window size
As = [np.random.rand(10 ** x) for x in range(1, 8)] # up to 10 million data data points
for i in range(len(As)): # 10% of points are always bad
As[i][np.random.randint(0, len(As[i]), len(As[i]) / 10)] = np.nan
results = np.array([[Timer(lambda: f(a, n)).timeit(number = 1) if n < len(a) \
else np.nan for n in Ns] for a in As])
pdresults = pd.DataFrame(results, index = [len(x) for x in As], columns = Ns)
return pdresults
def timeall(fns = funcs):
""" run timeone for all known funcs """
testd = dict([(x.__name__, timeone(x)) for x in fns])
testdf = pd.concat(testd.values(), axis = 0, keys = testd.keys())
testdf.index.names = ["coder", "asize"]
testdf.columns.names = ["nsize"]
testdf.reset_index(inplace = True)
testdf = pd.melt(testdf, id_vars = ["coder", "asize"])
return testdf
晚了几年,但我想出了一个完全矢量化的解决方案,不需要循环或副本(除了掩码本身)。这个解决方案有点(潜在)危险,因为它使用 numpy.lib.stride_tricks.as_strided
. It's also not as fast as
我们的想法是采用遮罩并创建它的二维视图,其中第二维只是当前元素之后的元素。然后,如果头部是 True
,则可以将整列设置为 True
。由于您正在处理视图,因此设置列实际上会在掩码中设置以下元素。
从数据开始:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
n = 3
现在,我们将使掩码 a.size + n
个元素变长,这样您就不必手动处理最后 n
个元素:
mask = np.empty(a.size + n, dtype=np.bool)
np.isnan(a, out=mask[:a.size])
mask[a.size:] = False
现在最酷的部分是:
view = np.lib.stride_tricks.as_strided(mask, shape=(n + 1, a.size),
strides=mask.strides * 2)
最后一部分很关键。 mask.strides
是一个类似于 (1,)
的元组(因为布尔值通常大约有那么多字节。加倍意味着您需要一个 1 字节的步骤来移动 any[=57 中的一个元素=]维度。
现在您需要做的就是展开遮罩:
view[1:, view[0]] = True
就是这样。现在 mask
有你想要的。请记住,这仅适用于分配索引在最后更改的值之前。你无法逃脱 view[1:] |= view[0]
.
出于基准测试目的,n
的定义似乎已从问题中更改,因此以下函数将其考虑在内:
def madphysicist0(a, n):
m = np.empty(a.size + n - 1, dtype=np.bool)
np.isnan(a, out=m[:a.size])
m[a.size:] = False
v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)
v[1:, v[0]] = True
return v[0]
V2
借鉴现有最快的答案,我们只需要复制 log<sub>2</sub>(n)
行,而不是 n
行:
def madphysicist1(a, n):
m = np.empty(a.size + n - 1, dtype=np.bool)
np.isnan(a, out=m[:a.size])
m[a.size:] = False
v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)
stop = int(np.log2(n))
for k in range(1, stop + 1):
v[k, v[0]] = True
if (1<<k) < n:
v[-1, v[(1<<k) - 1]] = True
return v[0]
由于每次迭代都会将掩码的大小加倍,因此它比 Fibonacci 快一点:https://math.stackexchange.com/q/894743/295281