pandas - 计算比当前行值 higher/lower 的连胜数
pandas - Count streak of values higher/lower than current rows
我正在寻找一种方法来获取 pandas 系列和 return 新系列,表示比系列中每一行 higher/lower 的先前连续值的数量:
a = pd.Series([30, 10, 20, 25, 35, 15])
...应该输出:
Value Higher than streak Lower than streak
30 0 0
10 0 1
20 1 0
25 2 0
35 4 0
15 0 3
这将使某人能够确定每个 "regional max/min" 值在时间序列中的重要性。
提前致谢。
import pandas as pd
import numpy as np
value = pd.Series([30, 10, 20, 25, 35, 15])
Lower=[(value[x]<value[:x]).sum() for x in range(len(value))]
Higher=[(value[x]>value[:x]).sum() for x in range(len(value))]
df=pd.DataFrame({"value":value,"Higher":Higher,"Lower":Lower})
print(df)
Lower Higher value
0 0 0 30
1 1 0 10
2 1 1 20
3 1 2 25
4 0 4 35
5 4 1 15
编辑:已更新为真正 计算连续值。我想不出一个可行的 pandas 解决方案,所以我们回到循环。
df = pd.Series(np.random.rand(10000))
def count_bigger_consecutives(values):
length = len(values)
result = np.zeros(length)
for i in range(length):
for j in range(i):
if(values[i]>values[j]):
result[i] += 1
else:
break
return result
%timeit count_bigger_consecutives(df.values)
1 loop, best of 3: 365 ms per loop
如果您关心性能,可以使用 numba 归档加速,python 代码的即时编译器。在这个例子中你真的可以看到 numba 闪耀:
from numba import jit
@jit(nopython=True)
def numba_count_bigger_consecutives(values):
length = len(values)
result = np.zeros(length)
for i in range(length):
for j in range(i):
if(values[i]>values[j]):
result[i] += 1
else:
break
return result
%timeit numba_count_bigger_consecutives(df.values)
The slowest run took 543.09 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 161 µs per loop
这是一位同事提出的解决方案(可能不是最有效的,但确实有效):
输入数据
a = pd.Series([30, 10, 20, 25, 35, 15])
创建 'higher' 列
b = []
for idx, value in enumerate(a):
count = 0
for i in range(idx, 0, -1):
if value < a.loc[i-1]:
break
count += 1
b.append([value, count])
higher = pd.DataFrame(b, columns=['Value', 'Higher'])
创建 'lower' 列
c = []
for idx, value in enumerate(a):
count = 0
for i in range(idx, 0, -1):
if value > a.loc[i-1]:
break
count += 1
c.append([value, count])
lower = pd.DataFrame(c, columns=['Value', 'Lower'])
合并两个新系列
print(pd.merge(higher, lower, on='Value'))
Value Higher Lower
0 30 0 0
1 10 0 1
2 20 1 0
3 25 2 0
4 35 4 0
5 15 0 3
这是我的解决方案 - 它有一个循环,但迭代次数只会是最大连胜长度。它存储是否已计算每行的条纹的状态,并在计算完成后停止。它使用 shift 来测试上一行是否为 higher/lower 并不断增加 shift 直到找到所有条纹。
a = pd.Series([30, 10, 20, 25, 35, 15, 15])
a_not_done_greater = pd.Series(np.ones(len(a))).astype(bool)
a_not_done_less = pd.Series(np.ones(len(a))).astype(bool)
a_streak_greater = pd.Series(np.zeros(len(a))).astype(int)
a_streak_less = pd.Series(np.zeros(len(a))).astype(int)
s = 1
not_done_greater = True
not_done_less = True
while not_done_greater or not_done_less:
if not_done_greater:
a_greater_than_shift = (a > a.shift(s))
a_streak_greater = a_streak_greater + (a_not_done_greater.astype(int) * a_greater_than_shift)
a_not_done_greater = a_not_done_greater & a_greater_than_shift
not_done_greater = a_not_done_greater.any()
if not_done_less:
a_less_than_shift = (a < a.shift(s))
a_streak_less = a_streak_less + (a_not_done_less.astype(int) * a_less_than_shift)
a_not_done_less = a_not_done_less & a_less_than_shift
not_done_less = a_not_done_less.any()
s = s + 1
res = pd.concat([a, a_streak_greater, a_streak_less], axis=1)
res.columns = ['value', 'greater_than_streak', 'less_than_streak']
print(res)
给出数据框
value greater_than_streak less_than_streak
0 30 0 0
1 10 0 1
2 20 1 0
3 25 2 0
4 35 4 0
5 15 0 3
6 15 0 0
由于您要回顾以前的值以查看是否存在连续值,因此您将不得不以某种方式与索引进行交互。此解决方案首先查看当前索引处的值之前的任何值,以查看它们是小于还是大于该值,然后将任何值设置为 False(如果后面有 False)。它还避免在 DataFrame 上创建迭代器,这可能会加速更大数据集的操作。
import pandas as pd
from operator import gt, lt
a = pd.Series([30, 10, 20, 25, 35, 15])
def consecutive_run(op, ser, i):
"""
Sum the uninterrupted consecutive runs at index i in the series where the previous data
was true according to the operator.
"""
thresh_all = op(ser[:i], ser[i])
# find any data where the operator was not passing. set the previous data to all falses
non_passing = thresh_all[~thresh_all]
start_idx = 0
if not non_passing.empty:
# if there was a failure, there was a break in the consecutive truth values,
# so get the final False position. Starting index will be False, but it
# will either be at the end of the series selection and will sum to zero
# or will be followed by all successive True values afterwards
start_idx = non_passing.index[-1]
# count the consecutive runs by summing from the start index onwards
return thresh_all[start_idx:].sum()
res = pd.concat([a, a.index.to_series().map(lambda i: consecutive_run(gt, a, i)),
a.index.to_series().map(lambda i: consecutive_run(lt, a, i))],
axis=1)
res.columns = ['Value', 'Higher than streak', 'Lower than streak']
print(res)
结果:
Value Higher than streak Lower than streak
0 30 0 0
1 10 1 0
2 20 0 1
3 25 0 2
4 35 0 4
5 15 3 0
我正在寻找一种方法来获取 pandas 系列和 return 新系列,表示比系列中每一行 higher/lower 的先前连续值的数量:
a = pd.Series([30, 10, 20, 25, 35, 15])
...应该输出:
Value Higher than streak Lower than streak
30 0 0
10 0 1
20 1 0
25 2 0
35 4 0
15 0 3
这将使某人能够确定每个 "regional max/min" 值在时间序列中的重要性。
提前致谢。
import pandas as pd
import numpy as np
value = pd.Series([30, 10, 20, 25, 35, 15])
Lower=[(value[x]<value[:x]).sum() for x in range(len(value))]
Higher=[(value[x]>value[:x]).sum() for x in range(len(value))]
df=pd.DataFrame({"value":value,"Higher":Higher,"Lower":Lower})
print(df)
Lower Higher value
0 0 0 30
1 1 0 10
2 1 1 20
3 1 2 25
4 0 4 35
5 4 1 15
编辑:已更新为真正 计算连续值。我想不出一个可行的 pandas 解决方案,所以我们回到循环。
df = pd.Series(np.random.rand(10000))
def count_bigger_consecutives(values):
length = len(values)
result = np.zeros(length)
for i in range(length):
for j in range(i):
if(values[i]>values[j]):
result[i] += 1
else:
break
return result
%timeit count_bigger_consecutives(df.values)
1 loop, best of 3: 365 ms per loop
如果您关心性能,可以使用 numba 归档加速,python 代码的即时编译器。在这个例子中你真的可以看到 numba 闪耀:
from numba import jit
@jit(nopython=True)
def numba_count_bigger_consecutives(values):
length = len(values)
result = np.zeros(length)
for i in range(length):
for j in range(i):
if(values[i]>values[j]):
result[i] += 1
else:
break
return result
%timeit numba_count_bigger_consecutives(df.values)
The slowest run took 543.09 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 161 µs per loop
这是一位同事提出的解决方案(可能不是最有效的,但确实有效):
输入数据
a = pd.Series([30, 10, 20, 25, 35, 15])
创建 'higher' 列
b = []
for idx, value in enumerate(a):
count = 0
for i in range(idx, 0, -1):
if value < a.loc[i-1]:
break
count += 1
b.append([value, count])
higher = pd.DataFrame(b, columns=['Value', 'Higher'])
创建 'lower' 列
c = []
for idx, value in enumerate(a):
count = 0
for i in range(idx, 0, -1):
if value > a.loc[i-1]:
break
count += 1
c.append([value, count])
lower = pd.DataFrame(c, columns=['Value', 'Lower'])
合并两个新系列
print(pd.merge(higher, lower, on='Value'))
Value Higher Lower
0 30 0 0
1 10 0 1
2 20 1 0
3 25 2 0
4 35 4 0
5 15 0 3
这是我的解决方案 - 它有一个循环,但迭代次数只会是最大连胜长度。它存储是否已计算每行的条纹的状态,并在计算完成后停止。它使用 shift 来测试上一行是否为 higher/lower 并不断增加 shift 直到找到所有条纹。
a = pd.Series([30, 10, 20, 25, 35, 15, 15])
a_not_done_greater = pd.Series(np.ones(len(a))).astype(bool)
a_not_done_less = pd.Series(np.ones(len(a))).astype(bool)
a_streak_greater = pd.Series(np.zeros(len(a))).astype(int)
a_streak_less = pd.Series(np.zeros(len(a))).astype(int)
s = 1
not_done_greater = True
not_done_less = True
while not_done_greater or not_done_less:
if not_done_greater:
a_greater_than_shift = (a > a.shift(s))
a_streak_greater = a_streak_greater + (a_not_done_greater.astype(int) * a_greater_than_shift)
a_not_done_greater = a_not_done_greater & a_greater_than_shift
not_done_greater = a_not_done_greater.any()
if not_done_less:
a_less_than_shift = (a < a.shift(s))
a_streak_less = a_streak_less + (a_not_done_less.astype(int) * a_less_than_shift)
a_not_done_less = a_not_done_less & a_less_than_shift
not_done_less = a_not_done_less.any()
s = s + 1
res = pd.concat([a, a_streak_greater, a_streak_less], axis=1)
res.columns = ['value', 'greater_than_streak', 'less_than_streak']
print(res)
给出数据框
value greater_than_streak less_than_streak
0 30 0 0
1 10 0 1
2 20 1 0
3 25 2 0
4 35 4 0
5 15 0 3
6 15 0 0
由于您要回顾以前的值以查看是否存在连续值,因此您将不得不以某种方式与索引进行交互。此解决方案首先查看当前索引处的值之前的任何值,以查看它们是小于还是大于该值,然后将任何值设置为 False(如果后面有 False)。它还避免在 DataFrame 上创建迭代器,这可能会加速更大数据集的操作。
import pandas as pd
from operator import gt, lt
a = pd.Series([30, 10, 20, 25, 35, 15])
def consecutive_run(op, ser, i):
"""
Sum the uninterrupted consecutive runs at index i in the series where the previous data
was true according to the operator.
"""
thresh_all = op(ser[:i], ser[i])
# find any data where the operator was not passing. set the previous data to all falses
non_passing = thresh_all[~thresh_all]
start_idx = 0
if not non_passing.empty:
# if there was a failure, there was a break in the consecutive truth values,
# so get the final False position. Starting index will be False, but it
# will either be at the end of the series selection and will sum to zero
# or will be followed by all successive True values afterwards
start_idx = non_passing.index[-1]
# count the consecutive runs by summing from the start index onwards
return thresh_all[start_idx:].sum()
res = pd.concat([a, a.index.to_series().map(lambda i: consecutive_run(gt, a, i)),
a.index.to_series().map(lambda i: consecutive_run(lt, a, i))],
axis=1)
res.columns = ['Value', 'Higher than streak', 'Lower than streak']
print(res)
结果:
Value Higher than streak Lower than streak
0 30 0 0
1 10 1 0
2 20 0 1
3 25 0 2
4 35 0 4
5 15 3 0