将 numpy 掩码有效地扩展到每个错误值的右侧 n 个单元格

Extend numpy mask by n cells to the right for each bad value, efficiently

假设我有一个长度为 30 的数组,其中有 4 个错误值。我想为那些坏值创建一个掩码,但由于我将使用滚动 window 函数,我还希望在每个坏值之后有固定数量的后续索引被标记为坏。在下面,n = 3:

我想尽可能高效地执行此操作,因为此例程将在包含数十亿个数据点的大型数据系列上 运行 多次。因此,我需要尽可能接近 numpy 向量化解决方案,因为我想避免 python 循环。

为避免重新输入,这里是数组:

import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])

是这样的吗?

maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright): 
   mask[l:r] = True

或者,由于 n 很小,因此循环遍历它可能会更好:

maskleft = np.where(np.isnan(a))[0]
mask = np.zeros(len(a),dtype=bool)
for i in range(0,n):
  thismask = maskleft+i
  mask[thismask] = True

除了对 n 的循环外,上面的内容是完全向量化的。但是该循环是完全可并行化的,因此您可以使用例如获得 n 倍加速。 multiprocessing 或 Cython,如果你愿意麻烦的话。

编辑:根据@askewchan 解决方案 2 可能会导致超出范围的错误。它在 range(0,n) 中也有索引问题。可能的更正:

maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+n,dtype=bool)
for i in range(0, n+1):
    thismask = maskleft + i
    ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-n]

您可以使用 np.ufunc.reduceat with np.bitwise_or:

import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
m = np.isnan(a)
n = 4
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)

np.bitwise_or.reduceat(m, ind.ravel())[::2]

关于您的数据:

print np.column_stack([m, reduced])
[[False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [ True  True]
 [False  True]
 [False  True]
 [False  True]
 [False False]
 [ True  True]
 [ True  True]
 [False  True]
 [False  True]
 [False  True]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [ True  True]
 [False  True]]

这也可以被认为是 morphological dilation problem, using here the scipy.ndimage.binary_dilation:

def dilation(a, n):
    m = np.isnan(a)
    s = np.full(n, True, bool)
    return ndimage.binary_dilation(m, structure=s, origin=-(n//2))

关于 origin 的注意事项:此参数确保 structure(我将其称为 内核 )从 structure 的左侧开始一点=16=](你的面具m)。通常 out[i] 处的值是 structure 中心(即 structure[n//2])在 in[i] 处的膨胀,但您希望 structure[0] 是在 in[i].

您也可以使用在左侧填充 Falses 的内核执行此操作,如果您使用 binary_dilation from scikit-image:

则需要这样做
def dilation_skimage(a, n):
    m = np.isnan(a)
    s = np.zeros(2*n - n%2, bool)
    s[-n:] = True
    return skimage.morphology.binary_dilation(m, selem=s)

两者之间的时间似乎没有太大变化:

dilation_scipy
small:    10 loops, best of 3: 47.9 ms per loop
large: 10000 loops, best of 3: 88.9 µs per loop

dilation_skimage
small:    10 loops, best of 3: 47.0 ms per loop
large: 10000 loops, best of 3: 91.1 µs per loop

您可以使用与 运行 平均过滤器相同的 cumsum 技巧:

def cumsum_trick(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask)
    cs[n:] -= cs[:-n].copy()
    return cs > 0

不幸的是,需要额外的 .copy(),因为 一些内部缓冲 操作顺序。可以说服 numpy 反向应用减法,但要使其工作,cs 数组必须具有负步幅:

def cumsum_trick_nocopy(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask, out=np.empty_like(a, int)[::-1])
    cs[n:] -= cs[:-n]
    out = cs > 0
    return out

但这看起来很脆弱,而且实际上并没有那么快。

我想知道在某处是否有编译信号处理函数可以执行此确切操作..


对于稀疏的初始掩码和小 n 这个也很快:

def index_expansion(a, n):
    mask = np.isnan(a)
    idx = np.flatnonzero(mask)
    expanded_idx = idx[:,None] + np.arange(1, n)
    np.put(mask, expanded_idx, True, 'clip')
    return mask

又一个答案!
它只是采用您已经拥有的掩码并将逻辑或应用于自身的移位版本。很好地矢量化并且非常快! :D

def repeat_or(a, n=4):
    m = np.isnan(a)
    k = m.copy()

    # lenM and lenK say for each mask how many
    # subsequent Trues there are at least
    lenM, lenK = 1, 1

    # we run until a combination of both masks will give us n or more
    # subsequent Trues
    while lenM+lenK < n:
        # append what we have in k to the end of what we have in m
        m[lenM:] |= k[:-lenM]

        # swap so that m is again the small one
        m, k = k, m

        # update the lengths
        lenM, lenK = lenK, lenM+lenK

    # see how much m has to be shifted in order to append the missing Trues
    k[n-lenM:] |= m[:-n+lenM]

    return k

不幸的是,我无法得到 m[i:] |= m[:-i] 运行... 修改和使用掩码修改自身可能不是一个好主意。它确实适用于 m[:-i] |= m[i:],但这是错误的方向。
无论如何,我们现在有类似斐波那契的增长,而不是二次增长,这仍然比线性增长要好。
(我从没想过我真的会写一个与斐波那契数列真正相关的算法,而不是一些奇怪的数学问题。)

在 "real" 条件下使用大小为 1e6 和 1e5 NAN 的数组进行测试:

In [5]: a = np.random.random(size=1e6)

In [6]: a[np.random.choice(np.arange(len(a), dtype=int), 1e5, replace=False)] = np.nan

In [7]: %timeit reduceat(a)
10 loops, best of 3: 65.2 ms per loop

In [8]: %timeit index_expansion(a)
100 loops, best of 3: 12 ms per loop

In [9]: %timeit cumsum_trick(a)
10 loops, best of 3: 17 ms per loop

In [10]: %timeit repeat_or(a)
1000 loops, best of 3: 1.9 ms per loop

In [11]: %timeit agml_indexing(a)
100 loops, best of 3: 6.91 ms per loop

我会将进一步的基准留给 Thomas。

OP 此处包含基准测试结果。我已经包含了我自己的 ("op"),它是我开始使用的,它循环遍历坏索引并向它们添加 1...n,然后使用唯一值来查找掩码索引。您可以在下面的代码中看到它以及所有其他响应。

不管怎样,这里是结果。方面是数组沿 x 的大小(10 到 10e7)和 window 沿 y(5、50、500、5000)的大小。然后是每个方面的编码器,得分为 log-10,因为我们谈论的是微秒到分钟。

@swenzel 似乎是他的第二个答案的赢家,取代了@moarningsun 的第一个答案(moarningsun 的第二个答案是通过大量内存使用使机器崩溃,但这可能是因为它不是为大 n 或非稀疏设计的一种)。

由于(必要的)对数刻度,该图表没有公正地显示这些贡献中最快的一个。它们甚至比体面的循环解决方案快几十倍、几百倍。在最大的情况下,swenzel1 比 op 快 1000 倍,并且 op 已经在使用 numpy。

请注意,我使用了针对优化的英特尔 MKL 库编译的 numpy 版本,该库充分利用了自 2012 年以来出现的 AVX 指令。在某些矢量用例中,这会将 i7/Xeon 速度提高系数 5。某些贡献可能比其他贡献受益更多。

这是 运行 迄今为止提交的所有答案的完整代码,包括我自己的答案。函数 allagree() 确保结果是正确的,而 timeall() 会给你一个长格式的 pandas 数据框,所有结果都在几秒钟内完成。

您可以很容易地用新代码重新运行它,或者改变我的假设。请记住,我没有考虑内存使用等其他因素。另外,我使用 R ggplot2 来制作图形,因为我对 seaborn/matplotlib 的了解还不足以让它做我想做的事。

为完整起见,所有结果一致:

In [4]: allagree(n = 7, asize = 777)
Out[4]:
             AGML0 AGML1 askewchan0 askewchan1 askewchan2 moarningsun0  \
AGML0         True  True       True       True       True         True
AGML1         True  True       True       True       True         True
askewchan0    True  True       True       True       True         True
askewchan1    True  True       True       True       True         True
askewchan2    True  True       True       True       True         True
moarningsun0  True  True       True       True       True         True
swenzel0      True  True       True       True       True         True
swenzel1      True  True       True       True       True         True
op            True  True       True       True       True         True

             swenzel0 swenzel1    op
AGML0            True     True  True
AGML1            True     True  True
askewchan0       True     True  True
askewchan1       True     True  True
askewchan2       True     True  True
moarningsun0     True     True  True
swenzel0         True     True  True
swenzel1         True     True  True
op               True     True  True

感谢所有投稿者!

使用 R 中的 pd.to_csv 和 read.csv 导出 timeall() 输出后的图形代码:

ww <- read.csv("ww.csv")    
ggplot(ww, aes(x=coder, y=value, col = coder)) + geom_point(size = 3) + scale_y_continuous(trans="log10")+ facet_grid(nsize ~ asize) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + ggtitle("Fastest by coder") + ylab("time (seconds)")

测试代码:

# test Stack Overflow 32706135 nan shift routines

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from timeit import Timer
from scipy import ndimage
from skimage import morphology
import itertools
import pdb
np.random.seed(8472)


def AGML0(a, n):                               # loop itertools
    maskleft = np.where(np.isnan(a))[0]
    maskright = maskleft + n
    mask = np.zeros(len(a),dtype=bool)
    for l,r in itertools.izip(maskleft,maskright): 
        mask[l:r] = True
    return mask


def AGML1(a, n):                               # loop n
    nn = n - 1
    maskleft = np.where(np.isnan(a))[0]
    ghost_mask = np.zeros(len(a)+nn,dtype=bool)
    for i in range(0, nn+1):
        thismask = maskleft + i
        ghost_mask[thismask] = True
    mask = ghost_mask[:len(ghost_mask)-nn]
    return mask


def askewchan0(a, n):
    m = np.isnan(a)
    i = np.arange(1, len(m)+1)
    ind = np.column_stack([i-n, i]) # may be a faster way to generate this
    ind.clip(0, len(m)-1, out=ind)
    return np.bitwise_or.reduceat(m, ind.ravel())[::2]


def askewchan1(a, n):
    m = np.isnan(a)
    s = np.full(n, True, bool)
    return ndimage.binary_dilation(m, structure=s, origin=-(n//2))


def askewchan2(a, n):
    m = np.isnan(a)
    s = np.zeros(2*n - n%2, bool)
    s[-n:] = True
    return morphology.binary_dilation(m, selem=s)


def moarningsun0(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask)
    cs[n:] -= cs[:-n].copy()
    return cs > 0


def moarningsun1(a, n):
    mask = np.isnan(a)
    idx = np.flatnonzero(mask)
    expanded_idx = idx[:,None] + np.arange(1, n)
    np.put(mask, expanded_idx, True, 'clip')
    return mask


def swenzel0(a, n):
    m = np.isnan(a)
    k = m.copy()
    for i in range(1, n):
        k[i:] |= m[:-i]
    return k


def swenzel1(a, n=4):
    m = np.isnan(a)
    k = m.copy()

    # lenM and lenK say for each mask how many
    # subsequent Trues there are at least
    lenM, lenK = 1, 1

    # we run until a combination of both masks will give us n or more
    # subsequent Trues
    while lenM+lenK < n:
        # append what we have in k to the end of what we have in m
        m[lenM:] |= k[:-lenM]

        # swap so that m is again the small one
        m, k = k, m

        # update the lengths
        lenM, lenK = lenK, lenM+lenK

    # see how much m has to be shifted in order to append the missing Trues
    k[n-lenM:] |= m[:-n+lenM]
    return k


def op(a, n):
    m = np.isnan(a)
    for x in range(1, n):
        m = np.logical_or(m, np.r_[False, m][:-1])
    return m


# all the functions in a list. NB these are the actual functions, not their names
funcs = [AGML0, AGML1, askewchan0, askewchan1, askewchan2, moarningsun0, swenzel0, swenzel1, op]

def allagree(fns = funcs, n = 10, asize = 100):
    """ make sure result is the same from all functions """
    fnames = [f.__name__ for f in fns]
    a = np.random.rand(asize)
    a[np.random.randint(0, asize, int(asize / 10))] = np.nan
    results = dict([(f.__name__, f(a, n)) for f in fns])
    isgood = [[np.array_equal(results[f1], results[f2]) for f1 in fnames] for f2 in fnames]
    pdgood = pd.DataFrame(isgood, columns = fnames, index = fnames)
    if not all([all(x) for x in isgood]):
        print "not all results identical"
        pdb.set_trace()
    return pdgood


def timeone(f):
    """ time one of the functions across the full range of a nd n """
    print "Timing", f.__name__
    Ns = np.array([10**x for x in range(0, 4)]) * 5 # 5 to 5000 window size
    As = [np.random.rand(10 ** x) for x in range(1, 8)] # up to 10 million data data points
    for i in range(len(As)): # 10% of points are always bad
        As[i][np.random.randint(0, len(As[i]), len(As[i]) / 10)] = np.nan
    results = np.array([[Timer(lambda: f(a, n)).timeit(number = 1) if n < len(a) \
                        else np.nan for n in Ns] for a in As])
    pdresults = pd.DataFrame(results, index = [len(x) for x in As], columns = Ns)
    return pdresults


def timeall(fns = funcs):
    """ run timeone for all known funcs """
    testd = dict([(x.__name__, timeone(x)) for x in fns])
    testdf = pd.concat(testd.values(), axis = 0, keys = testd.keys())
    testdf.index.names = ["coder", "asize"]
    testdf.columns.names = ["nsize"]
    testdf.reset_index(inplace = True)
    testdf = pd.melt(testdf, id_vars = ["coder", "asize"])
    return testdf

晚了几年,但我想出了一个完全矢量化的解决方案,不需要循环或副本(除了掩码本身)。这个解决方案有点(潜在)危险,因为它使用 numpy.lib.stride_tricks.as_strided. It's also not as fast as .

我们的想法是采用遮罩并创建它的二维视图,其中第二维只是当前元素之后的元素。然后,如果头部是 True,则可以将整列设置为 True。由于您正在处理视图,因此设置列实际上会在掩码中设置以下元素。

从数据开始:

import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
n = 3

现在,我们将使掩码 a.size + n 个元素变长,这样您就不必手动处理最后 n 个元素:

mask = np.empty(a.size + n, dtype=np.bool)
np.isnan(a, out=mask[:a.size])
mask[a.size:] = False

现在最酷的部分是:

view = np.lib.stride_tricks.as_strided(mask, shape=(n + 1, a.size),
                                       strides=mask.strides * 2)

最后一部分很关键。 mask.strides 是一个类似于 (1,) 的元组(因为布尔值通常大约有那么多字节。加倍意味着您需要一个 1 字节的步骤来移动 any[=57 中的一个元素=]维度。

现在您需要做的就是展开遮罩:

view[1:, view[0]] = True

就是这样。现在 mask 有你想要的。请记住,这仅适用于分配索引在最后更改的值之前。你无法逃脱 view[1:] |= view[0].

出于基准测试目的,n 的定义似乎已从问题中更改,因此以下函数将其考虑在内:

def madphysicist0(a, n):
    m = np.empty(a.size + n - 1, dtype=np.bool)
    np.isnan(a, out=m[:a.size])
    m[a.size:] = False

    v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)
    v[1:, v[0]] = True
    return v[0]

V2

借鉴现有最快的答案,我们只需要复制 log<sub>2</sub>(n) 行,而不是 n 行:

def madphysicist1(a, n):
    m = np.empty(a.size + n - 1, dtype=np.bool)
    np.isnan(a, out=m[:a.size])
    m[a.size:] = False

    v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)

    stop = int(np.log2(n))
    for k in range(1, stop + 1):
        v[k, v[0]] = True
    if (1<<k) < n:
        v[-1, v[(1<<k) - 1]] = True
    return v[0]

由于每次迭代都会将掩码的大小加倍,因此它比 Fibonacci 快一点:https://math.stackexchange.com/q/894743/295281