重采样(上采样、插值)一系列数字
Resampling (upsampling, interpolating) a series of numbers
我有一个逗号分隔的整数值系列,我想对它们进行重新采样,以便我有两倍的数量,其中在每个现有值之间添加一个新值。例如,如果这是我的来源:
1,5,11,9,13,21
结果将是:
1,3,5,8,11,10,9,11,13,17,21
如果不清楚,我尝试在我的源系列中的每个值之间添加一个数字,如下所示:
1 5 11 9 13 21
1 3 5 8 11 10 9 11 13 17 21
我搜索了很多,似乎 scipy.signal.resample 或 panda 之类的东西应该可以工作,但我对此完全陌生,无法让它工作。例如,这是我对 scipy:
的尝试之一
import numpy as np
from scipy import signal
InputFileName = "sample.raw"
DATA250 = np.loadtxt(InputFileName, delimiter=',', dtype=int);
print(DATA250)
DATA500 = signal.resample(DATA250, 11)
print(DATA500)
输出:
[ 1 5 11 9 13 21]
[ 1. -0.28829461 6.12324489 10.43251996 10.9108191 9.84503237
8.40293529 10.7641676 18.44182898 21.68506897 12.68267746]
显然我使用 signal.resample 不正确。有什么方法可以用 signal.resample 或 panda 做到这一点吗?我应该使用其他方法吗?
此外,在我的示例中,所有源数字之间都有一个整数。在我的实际数据中,情况并非如此。因此,如果其中两个数字是 10,15,则新数字将为 12.5。但是我希望所有结果数字都是整数。所以插入的新数字需要是 12 或 13(对我来说是哪个并不重要)。
请注意,一旦我完成这项工作,源文件实际上将是一个由逗号分隔的 2,000 个数字列表,输出应该是 4,000 个数字(或者技术上是 3,999,因为不会在末尾添加一个)。此外,这将用于处理类似于 ECG 记录的内容 - 目前 ECG 以 250 Hz 的频率采样 8 秒,然后将其传递给单独的进程以分析记录。然而,这个单独的过程需要以 500 Hz 的频率对录音进行采样。所以工作流程是,我将每 8 秒进行一次 250 Hz 的记录,并将其上采样到 500 Hz,然后将结果输出传递给分析过程。
感谢您提供的任何指导。
由于插值简单,您可以手动完成:
import numpy as np
a = np.array([1,5,11,9,13,21])
b = np.zeros(2*len(a)-1, dtype=np.uint32)
b[0::2] = a
b[1::2] = (a[:-1] + a[1:]) // 2
您也可以这样使用scipy.signal.resample
:
import numpy as np
from scipy import signal
a = np.array([1,5,11,9,13,21])
b = signal.resample(a, len(a) * 2)
b_int = b.astype(int)
诀窍是恰好有两倍数量的元素,以便奇数点与您的初始点匹配。此外,我认为 scipy.signal.resample
完成的傅立叶插值比您要求的线性插值更适合您的 ECG 信号。
由于您提出了 pandas 解决方案,因此有一种可能性:
import pandas as pd
import numpy as np
l = [1,4,11,9,14,21]
n = len(l)
df = pd.DataFrame(l, columns = ["l"]).reindex(np.linspace(0, n-1, 2*n-1)).interpolate().astype(int)
print(df)
虽然感觉没必要复杂。我在 pandas 中标记,以便更熟悉 pandas 功能的人看到它。
虽然我可能只是在这里使用 NumPy,与 非常相似,但实际上您不必这样做。
首先,您可以仅使用 csv
模块读取一行逗号分隔的数字:
with open(path) as f:
numbers = map(int, next(csv.reader(f))
... 或者只是字符串操作:
with open(path) as f:
numbers = map(int, next(f).split(','))
然后您可以轻松地进行插值:
def interpolate(numbers):
last = None
for number in numbers:
if last is not None:
yield (last+number)//2
yield number
last=number
如果您希望它完全通用且可重用,只需使用 function
参数和 yield function(last, number)
,并将 None
替换为 sentinel = object()
。
现在,您需要做的就是 join
结果和 write
结果:
with open(outpath, 'w') as f:
f.write(','.join(map(str, interpolate(numbers))))
这个解决方案有什么优势吗?好吧,除了 read/split 和 join/write 之外,它纯粹是懒惰的。而且我们可以很容易地编写惰性拆分和连接函数(或者只是手动完成)。因此,如果您不得不处理十亿个以逗号分隔的数字而不是一千个,那么您只需更改即可。
这里偷懒split
:
def isplit(s, sep):
start = 0
while True:
nextpos = s.find(sep, start)
if nextpos == -1:
yield s[start:]
return
yield s[start:nextpos]
start=nextpos+1
并且您可以使用 mmap
作为惰性读取字符串(好吧,bytes
,但我们的数据是纯 ASCII,所以没关系):
with open(path, 'rb') as f:
with mmap.mmap(inf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
numbers = map(int, isplit(mm, b','))
为了多样化,让我们使用不同的解决方案来懒惰写作:
def icsvwrite(f, seq, sep=','):
first = next(seq, None)
if not first: return
f.write(first)
for value in seq:
f.write(sep)
f.write(value)
所以,把它们放在一起:
with open(inpath, 'rb') as inf, open(outpath, 'w') as outf:
with mmap.mmap(inf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
numbers = map(int, isplit(mm, b','))
icsvwrite(outf, map(str, interpolate(numbers)))
但是,即使我能够很快地将它们组合在一起,并且所有部分都可以很好地重复使用,我 仍然 可能会使用 NumPy 来解决您的特定问题。您不会读取一行十亿个数字。您已经在唯一一台要 运行 此脚本的机器上安装了 NumPy。每 8 秒导入一次的成本(您可以通过让脚本在 运行 秒之间休眠来解决)。因此,很难击败优雅的 3 行解决方案。
我有一个逗号分隔的整数值系列,我想对它们进行重新采样,以便我有两倍的数量,其中在每个现有值之间添加一个新值。例如,如果这是我的来源:
1,5,11,9,13,21
结果将是:
1,3,5,8,11,10,9,11,13,17,21
如果不清楚,我尝试在我的源系列中的每个值之间添加一个数字,如下所示:
1 5 11 9 13 21
1 3 5 8 11 10 9 11 13 17 21
我搜索了很多,似乎 scipy.signal.resample 或 panda 之类的东西应该可以工作,但我对此完全陌生,无法让它工作。例如,这是我对 scipy:
的尝试之一import numpy as np
from scipy import signal
InputFileName = "sample.raw"
DATA250 = np.loadtxt(InputFileName, delimiter=',', dtype=int);
print(DATA250)
DATA500 = signal.resample(DATA250, 11)
print(DATA500)
输出:
[ 1 5 11 9 13 21]
[ 1. -0.28829461 6.12324489 10.43251996 10.9108191 9.84503237
8.40293529 10.7641676 18.44182898 21.68506897 12.68267746]
显然我使用 signal.resample 不正确。有什么方法可以用 signal.resample 或 panda 做到这一点吗?我应该使用其他方法吗?
此外,在我的示例中,所有源数字之间都有一个整数。在我的实际数据中,情况并非如此。因此,如果其中两个数字是 10,15,则新数字将为 12.5。但是我希望所有结果数字都是整数。所以插入的新数字需要是 12 或 13(对我来说是哪个并不重要)。
请注意,一旦我完成这项工作,源文件实际上将是一个由逗号分隔的 2,000 个数字列表,输出应该是 4,000 个数字(或者技术上是 3,999,因为不会在末尾添加一个)。此外,这将用于处理类似于 ECG 记录的内容 - 目前 ECG 以 250 Hz 的频率采样 8 秒,然后将其传递给单独的进程以分析记录。然而,这个单独的过程需要以 500 Hz 的频率对录音进行采样。所以工作流程是,我将每 8 秒进行一次 250 Hz 的记录,并将其上采样到 500 Hz,然后将结果输出传递给分析过程。
感谢您提供的任何指导。
由于插值简单,您可以手动完成:
import numpy as np
a = np.array([1,5,11,9,13,21])
b = np.zeros(2*len(a)-1, dtype=np.uint32)
b[0::2] = a
b[1::2] = (a[:-1] + a[1:]) // 2
您也可以这样使用scipy.signal.resample
:
import numpy as np
from scipy import signal
a = np.array([1,5,11,9,13,21])
b = signal.resample(a, len(a) * 2)
b_int = b.astype(int)
诀窍是恰好有两倍数量的元素,以便奇数点与您的初始点匹配。此外,我认为 scipy.signal.resample
完成的傅立叶插值比您要求的线性插值更适合您的 ECG 信号。
由于您提出了 pandas 解决方案,因此有一种可能性:
import pandas as pd
import numpy as np
l = [1,4,11,9,14,21]
n = len(l)
df = pd.DataFrame(l, columns = ["l"]).reindex(np.linspace(0, n-1, 2*n-1)).interpolate().astype(int)
print(df)
虽然感觉没必要复杂。我在 pandas 中标记,以便更熟悉 pandas 功能的人看到它。
虽然我可能只是在这里使用 NumPy,与
首先,您可以仅使用 csv
模块读取一行逗号分隔的数字:
with open(path) as f:
numbers = map(int, next(csv.reader(f))
... 或者只是字符串操作:
with open(path) as f:
numbers = map(int, next(f).split(','))
然后您可以轻松地进行插值:
def interpolate(numbers):
last = None
for number in numbers:
if last is not None:
yield (last+number)//2
yield number
last=number
如果您希望它完全通用且可重用,只需使用 function
参数和 yield function(last, number)
,并将 None
替换为 sentinel = object()
。
现在,您需要做的就是 join
结果和 write
结果:
with open(outpath, 'w') as f:
f.write(','.join(map(str, interpolate(numbers))))
这个解决方案有什么优势吗?好吧,除了 read/split 和 join/write 之外,它纯粹是懒惰的。而且我们可以很容易地编写惰性拆分和连接函数(或者只是手动完成)。因此,如果您不得不处理十亿个以逗号分隔的数字而不是一千个,那么您只需更改即可。
这里偷懒split
:
def isplit(s, sep):
start = 0
while True:
nextpos = s.find(sep, start)
if nextpos == -1:
yield s[start:]
return
yield s[start:nextpos]
start=nextpos+1
并且您可以使用 mmap
作为惰性读取字符串(好吧,bytes
,但我们的数据是纯 ASCII,所以没关系):
with open(path, 'rb') as f:
with mmap.mmap(inf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
numbers = map(int, isplit(mm, b','))
为了多样化,让我们使用不同的解决方案来懒惰写作:
def icsvwrite(f, seq, sep=','):
first = next(seq, None)
if not first: return
f.write(first)
for value in seq:
f.write(sep)
f.write(value)
所以,把它们放在一起:
with open(inpath, 'rb') as inf, open(outpath, 'w') as outf:
with mmap.mmap(inf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
numbers = map(int, isplit(mm, b','))
icsvwrite(outf, map(str, interpolate(numbers)))
但是,即使我能够很快地将它们组合在一起,并且所有部分都可以很好地重复使用,我 仍然 可能会使用 NumPy 来解决您的特定问题。您不会读取一行十亿个数字。您已经在唯一一台要 运行 此脚本的机器上安装了 NumPy。每 8 秒导入一次的成本(您可以通过让脚本在 运行 秒之间休眠来解决)。因此,很难击败优雅的 3 行解决方案。