如何像 pyspark 中的结果那样将字符串聚合到字典中?
How to aggregate string to dictionary like results in pyspark?
我有一个数据框,我想每天汇总。
data = [
(125, '2012-10-10','good'),
(20, '2012-10-10','good'),
(40, '2012-10-10','bad'),
(60, '2012-10-10','NA')]
df = spark.createDataFrame(data, ["temperature", "date","performance"])
我可以使用 spark 内置函数(如 max、min、avg)来聚合数值。我如何聚合字符串?
我希望是这样的:
date
max_temp
min_temp
performance_frequency
2012-10-10
125
20
"good": 2, "bad":1, "NA":1
我们可以使用 MapType 和 UDF with Counter 来 return 值计数,
from pyspark.sql import functions as F
from pyspark.sql.types import MapType,StringType,IntegerType
from collections import Counter
data = [(125, '2012-10-10','good'),(20, '2012-10-10','good'),(40, '2012-10-10','bad'),(60, '2012-10-10','NA')]
df = spark.createDataFrame(data, ["temperature", "date","performance"])
udf1 = F.udf(lambda x: dict(Counter(x)),MapType(StringType(),IntegerType()))
df.groupby('date').agg(F.min('temperature'),F.max('temperature'),udf1(F.collect_list('performance')).alias('performance_frequency')).show(1,False)
+----------+----------------+----------------+---------------------------------+
|date |min(temperature)|max(temperature)|performance_frequency |
+----------+----------------+----------------+---------------------------------+
|2012-10-10|20 |125 |Map(NA -> 1, bad -> 1, good -> 2)|
+----------+----------------+----------------+---------------------------------+
df.groupby('date').agg(F.min('temperature'),F.max('temperature'),udf1(F.collect_list('performance')).alias('performance_frequency')).collect()
[Row(date='2012-10-10', min(temperature)=20, max(temperature)=125, performance_frequency={'bad': 1, 'good': 2, 'NA': 1})]
希望对您有所帮助!
我有一个数据框,我想每天汇总。
data = [
(125, '2012-10-10','good'),
(20, '2012-10-10','good'),
(40, '2012-10-10','bad'),
(60, '2012-10-10','NA')]
df = spark.createDataFrame(data, ["temperature", "date","performance"])
我可以使用 spark 内置函数(如 max、min、avg)来聚合数值。我如何聚合字符串?
我希望是这样的:
date | max_temp | min_temp | performance_frequency |
---|---|---|---|
2012-10-10 | 125 | 20 | "good": 2, "bad":1, "NA":1 |
我们可以使用 MapType 和 UDF with Counter 来 return 值计数,
from pyspark.sql import functions as F
from pyspark.sql.types import MapType,StringType,IntegerType
from collections import Counter
data = [(125, '2012-10-10','good'),(20, '2012-10-10','good'),(40, '2012-10-10','bad'),(60, '2012-10-10','NA')]
df = spark.createDataFrame(data, ["temperature", "date","performance"])
udf1 = F.udf(lambda x: dict(Counter(x)),MapType(StringType(),IntegerType()))
df.groupby('date').agg(F.min('temperature'),F.max('temperature'),udf1(F.collect_list('performance')).alias('performance_frequency')).show(1,False)
+----------+----------------+----------------+---------------------------------+
|date |min(temperature)|max(temperature)|performance_frequency |
+----------+----------------+----------------+---------------------------------+
|2012-10-10|20 |125 |Map(NA -> 1, bad -> 1, good -> 2)|
+----------+----------------+----------------+---------------------------------+
df.groupby('date').agg(F.min('temperature'),F.max('temperature'),udf1(F.collect_list('performance')).alias('performance_frequency')).collect()
[Row(date='2012-10-10', min(temperature)=20, max(temperature)=125, performance_frequency={'bad': 1, 'good': 2, 'NA': 1})]
希望对您有所帮助!