滚动 window 多项式拟合 pandas

Rolling window polynomial fit in pandas

我正在尝试根据应用于 t 天 window 的 n 度多项式计算系数时间序列。但是,我收到一个异常 TypeError: only length-1 arrays can be converted to Python scalars

我的版本是:

代码:

import pandas as pd
import numpy as np
my_ts = pd.Series(data = np.random.normal(size = 365 * 2), index = pd.date_range(start = '2013-01-01', periods = 365 * 2))
coefs = pd.rolling_apply(my_ts, 21, lambda x: np.polyfit(range(len(x)), x, 3))

然而,当我包装 np.polyfit 以便它 return 只有一个系数时,rolling_apply 没有问题。

def pf_wrapper(x):
    coef_lst = np.polyfit(range(len(x)), x, 3)
    return coef_lst[0]
coefs = pd.rolling_apply(my_ts, 21, pf_wrapper)

更新:

由于pd.rolling_apply()无法return非标量,我目前的解决方案如下:

def get_beta(ts, deg):
    coefs = polyfit(range(len(ts)), ts, deg = 3)[::-1]
    return coefs[deg]

b0 = pd.rolling_apply(my_ts, 21, lambda x: get_beta(x, 0))
...
b3 = pd.rolling_apply(my_ts, 21, lambda x: get_beta(x, 3))

我认为 rolling_apply 不可能。 documentation 表示应用函数 "must produce a single value from an ndarray input"。它的实际意思似乎是"must produce a value that is or can be converted into a single float"。如果您跟进完整的异常回溯,它会将您带到 algos.pyx:

中的这段代码
output = np.empty(n, dtype=float)
counts = roll_sum(np.isfinite(input).astype(float), win, minp)

bufarr = np.empty(win, dtype=float)
oldbuf = <float64_t*> bufarr.data

n = len(input)
for i from 0 <= i < int_min(win, n):
    if counts[i] >= minp:
        output[i] = func(input[int_max(i - win + 1, 0) : i + 1], *args,
                         **kwargs)
    else:
        output[i] = NaN

错误出现在 output[i] = func(...) 行。您可以看到输出数组被硬编码为 dtype float。您收到的错误与您尝试将 numpy 数组(长度超过 1)转换为浮点数时收到的错误相同:

>>> float(np.array([1, 2, 3]))
Traceback (most recent call last):
  File "<pyshell#14>", line 1, in <module>
    float(np.array([1, 2, 3]))
TypeError: only length-1 arrays can be converted to Python scalars

所以发生的事情是它试图将 polyfit 的输出分配给 float ndarray 的单个元素,但失败了,因为 polyfit 的输出是一个无法转换为数组的数组浮动。

这可能是 "fixed" 通过使 output 具有 dtype 对象,但这会减慢速度。

我认为您必须考虑 rolling_apply 仅可用于 return 单个浮点数的函数。要支持非标量输出,您必须滚动(har har)您自己的 rolling_apply.

版本

我想创建一个 IIR 滤波器来扩展时间序列。 例如:[1,2,3,4,5] 和 window = 2 应该给出:[[1,2], [2,3], [3,4], [4,5]]

这是我基于一些糟糕的编码实践的解决方案,但可以完成工作。 Return 从 rolling_apply() 到全局 dict/array 的索引并丢弃 return 值。当 rolling_apply returns.

时,解决方案已在字典中准备就绪
import pandas as pd
import numpy as np

dataDict = dict()
INDEX = 0

def windowFunc(w):
  global INDEX
  global dataDict
  dataDict[INDEX] = np.copy(w)
  INDEX = INDEX + 1
  return INDEX

dd = pd.DataFrame([1,2,3,4,5,6,7,8,9,0])
dd2 = pd.rolling_apply(dd, window=2, func = windowFunc)
print(list(dataDict.values()))

我遇到了同样的问题,您可以将 [0] 添加到 lambda 函数中:

coefs = pd.rolling_apply(my_ts, 21, lambda x: np.polyfit(range(len(x)), x, 3)[0])

现在可以正常使用了。

由于 rolling_apply 已被弃用,Nissar 的解决方案也适用于 pd.rolling.apply 方法:

coefs = my_ts.rolling(21).apply(lambda x: np.polyfit(range(len(x)), x, 3)[0])    

这里特别重要的是 Nissar 使用 range(len(x)) 来满足时间分量,这避免了无法将 rolling.apply 与两列或系列的 lambda 函数一起使用(因为某些占位符x(时间)分量的计数通常位于数据框或另一个系列的另一列中)。