Pandas expanding/rolling window 与 p 值的相关性计算
Pandas expanding/rolling window correlation calculation with p-value
假设我有一个 DataFrame,我想在其上计算两列之间的滚动或扩展 Pearson 相关性
import numpy as np
import pandas as pd
import scipy.stats as st
df = pd.DataFrame({'x': np.random.rand(10000), 'y': np.random.rand(10000)})
使用内置的 pandas
功能可以非常快速地计算这个
expanding_corr = df['x'].expanding(50).corr(df['y'])
rolling_corr = df['x'].rolling(50).corr(df['y'])
但是,如果我希望获得与这些相关性相关的 p 值,我能做的最好的事情就是定义一个自定义滚动函数并将 apply
传递给 groupby
对象
def custom_roll(df, w, **kwargs):
v = df.values
d0, d1 = v.shape
s0, s1 = v.strides
a = np.lib.stride_tricks.as_strided(v, (d0 - (w - 1), w, d1), (s0, s0, s1))
rolled_df = pd.concat({
row: pd.DataFrame(values, columns=df.columns)
for row, values in zip(df.index[(w-1):], a)
})
return rolled_df.groupby(level=0, **kwargs)
c_df = custom_roll(df, 50).apply(lambda df: st.pearsonr(df['x'], df['y']))
c_df
现在包含适当的相关性,重要的是它们相关的 p 值。
但是,与内置的 pandas
方法相比,此方法非常慢,这意味着它不适合,因为实际上我在优化过程中要计算这些相关性数千次。此外,我不确定如何扩展 custom_roll
函数来扩展 windows。
任何人都可以指出利用 numpy
以矢量化速度获得超过 windows 扩展的 p 值的方向吗?
我想不出在 pandas 中直接使用 rolling
执行此操作的巧妙方法,但请注意,您可以在给定相关系数的情况下计算 p-value。
Pearson 的相关系数遵循 Student 的 t-distribution,您可以通过将其代入由不完整的 beta 函数 scipy.special.betainc
定义的 cdf 来获得 p-value。这听起来很复杂,但可以用几行代码完成。下面是一个根据相关系数 corr
和样本大小 n
计算 p-value 的函数。它实际上是基于您一直在使用的 scipy's implementation。
import pandas as pd
from scipy.special import betainc
def pvalue(corr, n=50):
df = n - 2
t_squared = corr**2 * (df / ((1.0 - corr) * (1.0 + corr)))
prob = betainc(0.5*df, 0.5, df/(df+t_squared))
return prob
然后您可以将此函数应用于您已有的相关值。
rolling_corr = df['x'].rolling(50).corr(df['y'])
pvalue(rolling_corr)
它可能不是完美的矢量化 numpy 解决方案,但应该比一遍又一遍地计算相关性快数十倍。
方法 #1
corr2_coeff_rowwise
列出了如何在行之间进行 element-wise 关联。我们可以将其分解为两列之间 element-wise 相关性的情况。所以,我们最终会得到一个使用 corr2_coeff_rowwise
的循环。然后,我们将尝试对其进行矢量化,看看其中是否有可以矢量化的部分:
- 使用
mean
获取平均值。这可以使用统一过滤器进行矢量化。
- 接下来是获取这些平均值与输入数组中的滑动元素之间的差异。要移植到矢量化的,我们将使用
broadcasting
.
其余部分保持不变,以便从 pearsonr
获得两个输出中的第一个。
为了得到第二个输出,我们回到source code
。鉴于第一个系数输出,这应该是 straight-forward。
所以,考虑到这些,我们最终会得到这样的结果 -
import scipy.special as special
from scipy.ndimage import uniform_filter
def sliding_corr1(a,b,W):
# a,b are input arrays; W is window length
am = uniform_filter(a.astype(float),W)
bm = uniform_filter(b.astype(float),W)
amc = am[W//2:-W//2+1]
bmc = bm[W//2:-W//2+1]
da = a[:,None]-amc
db = b[:,None]-bmc
# Get sliding mask of valid windows
m,n = da.shape
mask1 = np.arange(m)[:,None] >= np.arange(n)
mask2 = np.arange(m)[:,None] < np.arange(n)+W
mask = mask1 & mask2
dam = (da*mask)
dbm = (db*mask)
ssAs = np.einsum('ij,ij->j',dam,dam)
ssBs = np.einsum('ij,ij->j',dbm,dbm)
D = np.einsum('ij,ij->j',dam,dbm)
coeff = D/np.sqrt(ssAs*ssBs)
n = W
ab = n/2 - 1
pval = 2*special.btdtr(ab, ab, 0.5*(1 - abs(np.float64(coeff))))
return coeff,pval
因此,要从 pandas 系列的输入中获得最终输出 -
out = sliding_corr1(df['x'].to_numpy(copy=False),df['y'].to_numpy(copy=False),50)
方法 #2
与 Approach #1
非常相似,但我们将使用 numba
来提高内存效率,以取代之前方法中的步骤 #2。
from numba import njit
import math
@njit(parallel=True)
def sliding_corr2_coeff(a,b,amc,bmc):
L = len(a)-W+1
out00 = np.empty(L)
for i in range(L):
out_a = 0
out_b = 0
out_D = 0
for j in range(W):
d_a = a[i+j]-amc[i]
d_b = b[i+j]-bmc[i]
out_D += d_a*d_b
out_a += d_a**2
out_b += d_b**2
out00[i] = out_D/math.sqrt(out_a*out_b)
return out00
def sliding_corr2(a,b,W):
am = uniform_filter(a.astype(float),W)
bm = uniform_filter(b.astype(float),W)
amc = am[W//2:-W//2+1]
bmc = bm[W//2:-W//2+1]
coeff = sliding_corr2_coeff(a,b,amc,bmc)
ab = W/2 - 1
pval = 2*special.btdtr(ab, ab, 0.5*(1 - abs(np.float64(coeff))))
return coeff,pval
方法 #3
与上一个非常相似,除了我们将所有系数工作推到 numba
-
@njit(parallel=True)
def sliding_corr3_coeff(a,b,W):
L = len(a)-W+1
out00 = np.empty(L)
for i in range(L):
a_mean = 0.0
b_mean = 0.0
for j in range(W):
a_mean += a[i+j]
b_mean += b[i+j]
a_mean /= W
b_mean /= W
out_a = 0
out_b = 0
out_D = 0
for j in range(W):
d_a = a[i+j]-a_mean
d_b = b[i+j]-b_mean
out_D += d_a*d_b
out_a += d_a*d_a
out_b += d_b*d_b
out00[i] = out_D/math.sqrt(out_a*out_b)
return out00
def sliding_corr3(a,b,W):
coeff = sliding_corr3_coeff(a,b,W)
ab = W/2 - 1
pval = 2*special.btdtr(ab, ab, 0.5*(1 - np.abs(coeff)))
return coeff,pval
计时 -
In [181]: df = pd.DataFrame({'x': np.random.rand(10000), 'y': np.random.rand(10000)})
In [182]: %timeit sliding_corr2(df['x'].to_numpy(copy=False),df['y'].to_numpy(copy=False),50)
100 loops, best of 3: 5.05 ms per loop
In [183]: %timeit sliding_corr3(df['x'].to_numpy(copy=False),df['y'].to_numpy(copy=False),50)
100 loops, best of 3: 5.51 ms per loop
注:
sliding_corr1
似乎在这个数据集上花费了很长时间,很可能是因为其第 2 步中的 memory-requirement。
使用numba函数后的瓶颈,然后转移到special.btdtr
的p-val计算。
假设我有一个 DataFrame,我想在其上计算两列之间的滚动或扩展 Pearson 相关性
import numpy as np
import pandas as pd
import scipy.stats as st
df = pd.DataFrame({'x': np.random.rand(10000), 'y': np.random.rand(10000)})
使用内置的 pandas
功能可以非常快速地计算这个
expanding_corr = df['x'].expanding(50).corr(df['y'])
rolling_corr = df['x'].rolling(50).corr(df['y'])
但是,如果我希望获得与这些相关性相关的 p 值,我能做的最好的事情就是定义一个自定义滚动函数并将 apply
传递给 groupby
对象
def custom_roll(df, w, **kwargs):
v = df.values
d0, d1 = v.shape
s0, s1 = v.strides
a = np.lib.stride_tricks.as_strided(v, (d0 - (w - 1), w, d1), (s0, s0, s1))
rolled_df = pd.concat({
row: pd.DataFrame(values, columns=df.columns)
for row, values in zip(df.index[(w-1):], a)
})
return rolled_df.groupby(level=0, **kwargs)
c_df = custom_roll(df, 50).apply(lambda df: st.pearsonr(df['x'], df['y']))
c_df
现在包含适当的相关性,重要的是它们相关的 p 值。
但是,与内置的 pandas
方法相比,此方法非常慢,这意味着它不适合,因为实际上我在优化过程中要计算这些相关性数千次。此外,我不确定如何扩展 custom_roll
函数来扩展 windows。
任何人都可以指出利用 numpy
以矢量化速度获得超过 windows 扩展的 p 值的方向吗?
我想不出在 pandas 中直接使用 rolling
执行此操作的巧妙方法,但请注意,您可以在给定相关系数的情况下计算 p-value。
Pearson 的相关系数遵循 Student 的 t-distribution,您可以通过将其代入由不完整的 beta 函数 scipy.special.betainc
定义的 cdf 来获得 p-value。这听起来很复杂,但可以用几行代码完成。下面是一个根据相关系数 corr
和样本大小 n
计算 p-value 的函数。它实际上是基于您一直在使用的 scipy's implementation。
import pandas as pd
from scipy.special import betainc
def pvalue(corr, n=50):
df = n - 2
t_squared = corr**2 * (df / ((1.0 - corr) * (1.0 + corr)))
prob = betainc(0.5*df, 0.5, df/(df+t_squared))
return prob
然后您可以将此函数应用于您已有的相关值。
rolling_corr = df['x'].rolling(50).corr(df['y'])
pvalue(rolling_corr)
它可能不是完美的矢量化 numpy 解决方案,但应该比一遍又一遍地计算相关性快数十倍。
方法 #1
corr2_coeff_rowwise
列出了如何在行之间进行 element-wise 关联。我们可以将其分解为两列之间 element-wise 相关性的情况。所以,我们最终会得到一个使用 corr2_coeff_rowwise
的循环。然后,我们将尝试对其进行矢量化,看看其中是否有可以矢量化的部分:
- 使用
mean
获取平均值。这可以使用统一过滤器进行矢量化。 - 接下来是获取这些平均值与输入数组中的滑动元素之间的差异。要移植到矢量化的,我们将使用
broadcasting
.
其余部分保持不变,以便从 pearsonr
获得两个输出中的第一个。
为了得到第二个输出,我们回到source code
。鉴于第一个系数输出,这应该是 straight-forward。
所以,考虑到这些,我们最终会得到这样的结果 -
import scipy.special as special
from scipy.ndimage import uniform_filter
def sliding_corr1(a,b,W):
# a,b are input arrays; W is window length
am = uniform_filter(a.astype(float),W)
bm = uniform_filter(b.astype(float),W)
amc = am[W//2:-W//2+1]
bmc = bm[W//2:-W//2+1]
da = a[:,None]-amc
db = b[:,None]-bmc
# Get sliding mask of valid windows
m,n = da.shape
mask1 = np.arange(m)[:,None] >= np.arange(n)
mask2 = np.arange(m)[:,None] < np.arange(n)+W
mask = mask1 & mask2
dam = (da*mask)
dbm = (db*mask)
ssAs = np.einsum('ij,ij->j',dam,dam)
ssBs = np.einsum('ij,ij->j',dbm,dbm)
D = np.einsum('ij,ij->j',dam,dbm)
coeff = D/np.sqrt(ssAs*ssBs)
n = W
ab = n/2 - 1
pval = 2*special.btdtr(ab, ab, 0.5*(1 - abs(np.float64(coeff))))
return coeff,pval
因此,要从 pandas 系列的输入中获得最终输出 -
out = sliding_corr1(df['x'].to_numpy(copy=False),df['y'].to_numpy(copy=False),50)
方法 #2
与 Approach #1
非常相似,但我们将使用 numba
来提高内存效率,以取代之前方法中的步骤 #2。
from numba import njit
import math
@njit(parallel=True)
def sliding_corr2_coeff(a,b,amc,bmc):
L = len(a)-W+1
out00 = np.empty(L)
for i in range(L):
out_a = 0
out_b = 0
out_D = 0
for j in range(W):
d_a = a[i+j]-amc[i]
d_b = b[i+j]-bmc[i]
out_D += d_a*d_b
out_a += d_a**2
out_b += d_b**2
out00[i] = out_D/math.sqrt(out_a*out_b)
return out00
def sliding_corr2(a,b,W):
am = uniform_filter(a.astype(float),W)
bm = uniform_filter(b.astype(float),W)
amc = am[W//2:-W//2+1]
bmc = bm[W//2:-W//2+1]
coeff = sliding_corr2_coeff(a,b,amc,bmc)
ab = W/2 - 1
pval = 2*special.btdtr(ab, ab, 0.5*(1 - abs(np.float64(coeff))))
return coeff,pval
方法 #3
与上一个非常相似,除了我们将所有系数工作推到 numba
-
@njit(parallel=True)
def sliding_corr3_coeff(a,b,W):
L = len(a)-W+1
out00 = np.empty(L)
for i in range(L):
a_mean = 0.0
b_mean = 0.0
for j in range(W):
a_mean += a[i+j]
b_mean += b[i+j]
a_mean /= W
b_mean /= W
out_a = 0
out_b = 0
out_D = 0
for j in range(W):
d_a = a[i+j]-a_mean
d_b = b[i+j]-b_mean
out_D += d_a*d_b
out_a += d_a*d_a
out_b += d_b*d_b
out00[i] = out_D/math.sqrt(out_a*out_b)
return out00
def sliding_corr3(a,b,W):
coeff = sliding_corr3_coeff(a,b,W)
ab = W/2 - 1
pval = 2*special.btdtr(ab, ab, 0.5*(1 - np.abs(coeff)))
return coeff,pval
计时 -
In [181]: df = pd.DataFrame({'x': np.random.rand(10000), 'y': np.random.rand(10000)})
In [182]: %timeit sliding_corr2(df['x'].to_numpy(copy=False),df['y'].to_numpy(copy=False),50)
100 loops, best of 3: 5.05 ms per loop
In [183]: %timeit sliding_corr3(df['x'].to_numpy(copy=False),df['y'].to_numpy(copy=False),50)
100 loops, best of 3: 5.51 ms per loop
注:
sliding_corr1
似乎在这个数据集上花费了很长时间,很可能是因为其第 2 步中的 memory-requirement。使用numba函数后的瓶颈,然后转移到
special.btdtr
的p-val计算。