重采样(上采样、插值)一系列数字

Resampling (upsampling, interpolating) a series of numbers

我有一个逗号分隔的整数值系列,我想对它们进行重新采样,以便我有两倍的数量,其中在每个现有值之间添加一个新值。例如,如果这是我的来源:

1,5,11,9,13,21

结果将是:

1,3,5,8,11,10,9,11,13,17,21

如果不清楚,我尝试在我的源系列中的每个值之间添加一个数字,如下所示:

1   5   11    9    13    21
1 3 5 8 11 10 9 11 13 17 21

我搜索了很多,似乎 scipy.signal.resample 或 panda 之类的东西应该可以工作,但我对此完全陌生,无法让它工作。例如,这是我对 scipy:

的尝试之一
import numpy as np
from scipy import signal
InputFileName = "sample.raw"
DATA250  = np.loadtxt(InputFileName, delimiter=',', dtype=int);
print(DATA250)
DATA500 = signal.resample(DATA250, 11)
print(DATA500)

输出:

[ 1  5 11  9 13 21]
[ 1.         -0.28829461  6.12324489 10.43251996 10.9108191   9.84503237
  8.40293529 10.7641676  18.44182898 21.68506897 12.68267746]

显然我使用 signal.resample 不正确。有什么方法可以用 signal.resample 或 panda 做到这一点吗?我应该使用其他方法吗?

此外,在我的示例中,所有源数字之间都有一个整数。在我的实际数据中,情况并非如此。因此,如果其中两个数字是 10,15,则新数字将为 12.5。但是我希望所有结果数字都是整数。所以插入的新数字需要是 12 或 13(对我来说是哪个并不重要)。

请注意,一旦我完成这项工作,源文件实际上将是一个由逗号分隔的 2,000 个数字列表,输出应该是 4,000 个数字(或者技术上是 3,999,因为不会在末尾添加一个)。此外,这将用于处理类似于 ECG 记录的内容 - 目前 ECG 以 250 Hz 的频率采样 8 秒,然后将其传递给单独的进程以分析记录。然而,这个单独的过程需要以 500 Hz 的频率对录音进行采样。所以工作流程是,我将每 8 秒进行一次 250 Hz 的记录,并将其上采样到 500 Hz,然后将结果输出传递给分析过程。

感谢您提供的任何指导。

由于插值简单,您可以手动完成:

import numpy as np
a = np.array([1,5,11,9,13,21])
b = np.zeros(2*len(a)-1, dtype=np.uint32)
b[0::2] = a
b[1::2] = (a[:-1] + a[1:]) // 2

您也可以这样使用scipy.signal.resample

import numpy as np
from scipy import signal
a = np.array([1,5,11,9,13,21])
b = signal.resample(a, len(a) * 2)
b_int = b.astype(int)

诀窍是恰好有两倍数量的元素,以便奇数点与您的初始点匹配。此外,我认为 scipy.signal.resample 完成的傅立叶插值比您要求的线性插值更适合您的 ECG 信号。

由于您提出了 pandas 解决方案,因此有一种可能性:

import pandas as pd
import numpy as np

l = [1,4,11,9,14,21]
n = len(l)

df = pd.DataFrame(l, columns = ["l"]).reindex(np.linspace(0, n-1, 2*n-1)).interpolate().astype(int)

print(df)

虽然感觉没必要复杂。我在 pandas 中标记,以便更熟悉 pandas 功能的人看到它。

虽然我可能只是在这里使用 NumPy,与 非常相似,但实际上您不必这样做。


首先,您可以仅使用 csv 模块读取一行逗号分隔的数字:

with open(path) as f:
    numbers = map(int, next(csv.reader(f))

... 或者只是字符串操作:

with open(path) as f:
    numbers = map(int, next(f).split(','))

然后您可以轻松地进行插值:

def interpolate(numbers):
    last = None
    for number in numbers:
        if last is not None:
            yield (last+number)//2
        yield number
        last=number

如果您希望它完全通用且可重用,只需使用 function 参数和 yield function(last, number),并将 None 替换为 sentinel = object()


现在,您需要做的就是 join 结果和 write 结果:

with open(outpath, 'w') as f:
    f.write(','.join(map(str, interpolate(numbers))))

这个解决方案有什么优势吗?好吧,除了 read/split 和 join/write 之外,它纯粹是懒惰的。而且我们可以很容易地编写惰性拆分和连接函数(或者只是手动完成)。因此,如果您不得不处理十亿个以逗号分隔的数字而不是一千个,那么您只需更改即可。

这里偷懒split:

def isplit(s, sep):
    start = 0
    while True:
        nextpos = s.find(sep, start)
        if nextpos == -1:
            yield s[start:]
            return
        yield s[start:nextpos]
        start=nextpos+1

并且您可以使用 mmap 作为惰性读取字符串(好吧,bytes,但我们的数据是纯 ASCII,所以没关系):

with open(path, 'rb') as f:
    with mmap.mmap(inf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        numbers = map(int, isplit(mm, b','))

为了多样化,让我们使用不同的解决方案来懒惰写作:

def icsvwrite(f, seq, sep=','):
    first = next(seq, None)
    if not first: return
    f.write(first)
    for value in seq:
        f.write(sep)
        f.write(value)

所以,把它们放在一起:

with open(inpath, 'rb') as inf, open(outpath, 'w') as outf:
    with mmap.mmap(inf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        numbers = map(int, isplit(mm, b','))
        icsvwrite(outf, map(str, interpolate(numbers)))

但是,即使我能够很快地将它们组合在一起,并且所有部分都可以很好地重复使用,我 仍然 可能会使用 NumPy 来解决您的特定问题。您不会读取一行十亿个数字。您已经在唯一一台要 运行 此脚本的机器上安装了 NumPy。每 8 秒导入一次的成本(您可以通过让脚本在 运行 秒之间休眠来解决)。因此,很难击败优雅的 3 行解决方案。