在 Azure Databricks 中按范围内的值分组
Group by value within range in Azure Databricks
考虑以下数据:
EventDate,Value
1.1.2019,11
1.2.2019,5
1.3.2019,6
1.4.2019,-15
1.5.2019,-20
1.6.2019,-30
1.7.2019,12
1.8.2019,20
我想创建这些值在阈值范围内的组:
1. > 10
2. <=10 >=-10
3. >-10
结果应该是某个状态下值的开始和结束:
1.1.2019, 1.1.2019, [11]
1.2.2019, 1.3.2019, [5, 6]
1.4.2019, 1.6.2019, [-15, -20, -30]
1.7.2019, 1.8.2018, [12, 20]
我相信答案在 window 函数中,但我对数据块还很陌生,我还不明白如何使用它(还)。
这是一个有效的 (python) 解决方案,它基于将数据帧作为列表循环,但是我更喜欢直接在数据帧上工作以提高性能的解决方案。
from pyspark.sql.functions import *
import pandas as pd
STATETHRESHOLDCHARGE = 10
list = [{"eventDateTime":x["EventDate"], "value":x["Value"]} for x in dataframe.sort(dfArrayOneCast.EventDate).rdd.collect()]
cycles = []
previous = None
for row in list:
currentState = 'charge'
if row["value"] < STATETHRESHOLDCHARGE and row["value"] > (STATETHRESHOLDCHARGE * -1):
currentState = 'idle'
if row["value"] <= (STATETHRESHOLDCHARGE * -1):
currentState = 'discharge'
eventDateTime = row["eventDateTime"]
if previous is None or previous["state"] != currentState:
previous = {"start":row["eventDateTime"], "end":row["eventDateTime"], "values":[row["value"]], "timestamps":[row["eventDateTime"]], "state":currentState}
cycles.append(previous)
else:
previous["end"] = row["eventDateTime"]
previous["values"].append(row["value"])
previous["timestamps"].append(row["eventDateTime"])
display(cycles)
我在Python中用Pandas创建了一个csv文件来测试我的示例代码,test.csv
文件的内容如下。
A,B
1.1.2019,11
1.2.2019,5
1.3.2019,6
1.4.2019,-15
1.5.2019,-20
1.6.2019,-30
1.7.2019,12
1.8.2019,20
由于存在限制,无法构造具有不同 closed
值的 pandas.IntervalIndex
with some pandas.Interval
,例如 right
、left
、both
、neither
,所以我将您定义的具有阈值的组转换为具有 right
关闭的等效组,如下所示。
Your defined groups | The equivalent groups
1. > 10 : (10,inf] | >10 : (10, inf]
2. <=10 >=-10 : [-10,10] | <=10 >-11 : (-11,10]
3. <-10 : (-inf,-10) | <=-11 : (-inf, -11]
这是我的示例代码。
import pandas as pd
import numpy as np
df = pd.read_csv('test.csv')
# print(df)
bins = np.array([-np.inf, -11, 10, np.inf])
"""
# Can not construct IntervalIndex with Intervals with different closed
>>> left = pd.Interval(left=-np.Inf, right=-10, closed='neither')
>>> center = pd.Interval(left=-10, right=10, closed='both')
>>> right = pd.Interval(left=10, right=np.Inf, closed='neither')
>>> pd.IntervalIndex([left, center, right])
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "\lib\site-packages\pandas\core\indexes\interval.py", line 151, in __new__
verify_integrity=verify_integrity)
File "\lib\site-packages\pandas\core\arrays\interval.py", line 157, in __new__
data, validate_closed=closed is None)
File "pandas\_libs\interval.pyx", line 483, in pandas._libs.interval.intervals_to_interval_bounds
ValueError: intervals must all be closed on the same side
"""
# print(bins)
labels = ['left', 'center', 'right']
by = pd.cut(df['B'], bins, labels=labels)
# print(by)
groupby = df.groupby(by)
result = pd.DataFrame([(groupby['A'].min()[label], groupby['A'].max()[label], df['B'].take(groupby.indices.get(label)).get_values()) for label in labels], index=['<-10','>=-10 <=10','>10'], columns=['min_A', 'max_A', 'Bs'])
print(result)
结果如下
min_A max_A Bs
<-10 1.4.2019 1.6.2019 [-15, -20, -30]
>=-10 <=10 1.2.2019 1.3.2019 [5, 6]
>10 1.1.2019 1.8.2019 [11, 12, 20]
假设你在 df 数据框中有以上数据,让我们一块一块地看
from pyspark.sql.functions import col, last, lag, udf, when, collect_list
from pyspark.sql.types import StringType
value = 'value'
date = 'EventDate'
valueBag = 'valueBag'
def bagTransform(v):
if v > 10:
return 'charging'
elif v < -10:
return 'discharging'
else:
return 'idle'
bagTransformUDF = udf(bagTransform, StringType())
withBaggedValue = df.withColumn(valueBag, bagTransformUDF(col(value)))
所以首先我们将值打包到您声明的范围内,现在我们可以使用 lag
将 window 移动到先前的值上:
from pyspark.sql import Window
windowSpec = Window.orderBy(date)
prevValueBag = 'prevValueBag'
bagBeginning = 'bagBeginning'
withLag = (withBaggedValue
.withColumn(prevValueBag, lag(withBaggedValue[valueBag]).over(windowSpec)))
现在有趣的部分开始了:我们检测变化点并临时分配当前事件日期或空值:
withInitialBeginnings = withLag.withColumn(bagBeginning, when((col(prevValueBag) != col(valueBag)) | col(prevValueBag).isNull(), col(date)).otherwise(None))
并使用最后找到的值填写它们
withFilledBeginnings = (withInitialBeginnings.withColumn(bagBeginning,
last(col(bagBeginning), ignorenulls=True)
.over(windowSpec)))
display(withFilledBeginnings)
有了那个集合,我们可以简单地聚合起点
aggregate = withFilledBeginnings.groupby(col(bagBeginning)).agg(collect_list(value))
display(aggregate)
如果您还需要结束日期,您可以使用 pyspark.sql.functions.lead
进行类似的预处理,它与 last
对称地工作,但在向前方向。
考虑以下数据:
EventDate,Value
1.1.2019,11
1.2.2019,5
1.3.2019,6
1.4.2019,-15
1.5.2019,-20
1.6.2019,-30
1.7.2019,12
1.8.2019,20
我想创建这些值在阈值范围内的组:
1. > 10
2. <=10 >=-10
3. >-10
结果应该是某个状态下值的开始和结束:
1.1.2019, 1.1.2019, [11]
1.2.2019, 1.3.2019, [5, 6]
1.4.2019, 1.6.2019, [-15, -20, -30]
1.7.2019, 1.8.2018, [12, 20]
我相信答案在 window 函数中,但我对数据块还很陌生,我还不明白如何使用它(还)。
这是一个有效的 (python) 解决方案,它基于将数据帧作为列表循环,但是我更喜欢直接在数据帧上工作以提高性能的解决方案。
from pyspark.sql.functions import *
import pandas as pd
STATETHRESHOLDCHARGE = 10
list = [{"eventDateTime":x["EventDate"], "value":x["Value"]} for x in dataframe.sort(dfArrayOneCast.EventDate).rdd.collect()]
cycles = []
previous = None
for row in list:
currentState = 'charge'
if row["value"] < STATETHRESHOLDCHARGE and row["value"] > (STATETHRESHOLDCHARGE * -1):
currentState = 'idle'
if row["value"] <= (STATETHRESHOLDCHARGE * -1):
currentState = 'discharge'
eventDateTime = row["eventDateTime"]
if previous is None or previous["state"] != currentState:
previous = {"start":row["eventDateTime"], "end":row["eventDateTime"], "values":[row["value"]], "timestamps":[row["eventDateTime"]], "state":currentState}
cycles.append(previous)
else:
previous["end"] = row["eventDateTime"]
previous["values"].append(row["value"])
previous["timestamps"].append(row["eventDateTime"])
display(cycles)
我在Python中用Pandas创建了一个csv文件来测试我的示例代码,test.csv
文件的内容如下。
A,B
1.1.2019,11
1.2.2019,5
1.3.2019,6
1.4.2019,-15
1.5.2019,-20
1.6.2019,-30
1.7.2019,12
1.8.2019,20
由于存在限制,无法构造具有不同 closed
值的 pandas.IntervalIndex
with some pandas.Interval
,例如 right
、left
、both
、neither
,所以我将您定义的具有阈值的组转换为具有 right
关闭的等效组,如下所示。
Your defined groups | The equivalent groups
1. > 10 : (10,inf] | >10 : (10, inf]
2. <=10 >=-10 : [-10,10] | <=10 >-11 : (-11,10]
3. <-10 : (-inf,-10) | <=-11 : (-inf, -11]
这是我的示例代码。
import pandas as pd
import numpy as np
df = pd.read_csv('test.csv')
# print(df)
bins = np.array([-np.inf, -11, 10, np.inf])
"""
# Can not construct IntervalIndex with Intervals with different closed
>>> left = pd.Interval(left=-np.Inf, right=-10, closed='neither')
>>> center = pd.Interval(left=-10, right=10, closed='both')
>>> right = pd.Interval(left=10, right=np.Inf, closed='neither')
>>> pd.IntervalIndex([left, center, right])
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "\lib\site-packages\pandas\core\indexes\interval.py", line 151, in __new__
verify_integrity=verify_integrity)
File "\lib\site-packages\pandas\core\arrays\interval.py", line 157, in __new__
data, validate_closed=closed is None)
File "pandas\_libs\interval.pyx", line 483, in pandas._libs.interval.intervals_to_interval_bounds
ValueError: intervals must all be closed on the same side
"""
# print(bins)
labels = ['left', 'center', 'right']
by = pd.cut(df['B'], bins, labels=labels)
# print(by)
groupby = df.groupby(by)
result = pd.DataFrame([(groupby['A'].min()[label], groupby['A'].max()[label], df['B'].take(groupby.indices.get(label)).get_values()) for label in labels], index=['<-10','>=-10 <=10','>10'], columns=['min_A', 'max_A', 'Bs'])
print(result)
结果如下
min_A max_A Bs
<-10 1.4.2019 1.6.2019 [-15, -20, -30]
>=-10 <=10 1.2.2019 1.3.2019 [5, 6]
>10 1.1.2019 1.8.2019 [11, 12, 20]
假设你在 df 数据框中有以上数据,让我们一块一块地看
from pyspark.sql.functions import col, last, lag, udf, when, collect_list
from pyspark.sql.types import StringType
value = 'value'
date = 'EventDate'
valueBag = 'valueBag'
def bagTransform(v):
if v > 10:
return 'charging'
elif v < -10:
return 'discharging'
else:
return 'idle'
bagTransformUDF = udf(bagTransform, StringType())
withBaggedValue = df.withColumn(valueBag, bagTransformUDF(col(value)))
所以首先我们将值打包到您声明的范围内,现在我们可以使用 lag
将 window 移动到先前的值上:
from pyspark.sql import Window
windowSpec = Window.orderBy(date)
prevValueBag = 'prevValueBag'
bagBeginning = 'bagBeginning'
withLag = (withBaggedValue
.withColumn(prevValueBag, lag(withBaggedValue[valueBag]).over(windowSpec)))
现在有趣的部分开始了:我们检测变化点并临时分配当前事件日期或空值:
withInitialBeginnings = withLag.withColumn(bagBeginning, when((col(prevValueBag) != col(valueBag)) | col(prevValueBag).isNull(), col(date)).otherwise(None))
并使用最后找到的值填写它们
withFilledBeginnings = (withInitialBeginnings.withColumn(bagBeginning,
last(col(bagBeginning), ignorenulls=True)
.over(windowSpec)))
display(withFilledBeginnings)
aggregate = withFilledBeginnings.groupby(col(bagBeginning)).agg(collect_list(value))
display(aggregate)
如果您还需要结束日期,您可以使用 pyspark.sql.functions.lead
进行类似的预处理,它与 last
对称地工作,但在向前方向。