Spark window 函数按行中出现频率最高的值聚合
Spark window function aggregate by most frequent value in row
我想在给定的 window 中获取连续出现频率最高的字符串,并将此值放在新的一行中。 (我正在使用 Pyspark)
这就是我的 table 的样子。
window label value
123 a 54
123 a 45
123 a 21
123 b 99
123 b 78
我正在做一些聚合,目前我正在按 window
和 label
分组。
sqlContext.sql(SELECT avg(value) as avgValue FROM table GROUP BY window, label)
这个 returns window = 123 and label = a 的平均值和 window = 123 and label = b
的平均值
我想做的是按最常出现的字符串降序排列 label
,因此在我的 sql 语句中我可以执行 SELECT first(label) as majLabel, avg(value) as avgValue FROM table GROUP BY window
我正在尝试在 window 函数中执行此操作,但还没有完全实现。
group = ["window"]
w = (Window().partitionBy(*group))
df = spark.createDataFrame([['123','a','54'],['123','a','45'],['123','a','21'],['123','b','99'],['123','b','78'],],['window','label','value'])
定义一个正确的WindowSpec。
win_spec = window.partitionBy(['window','label']).orderBy(col('value').desc())
Returns window 分区内从 1 开始的序号 ['window','label'].
str_rank = df.withColumn('string_rank',row_number().over(win_spec))
str_rank.show()
这是 df 现在的样子:
Select window 和 "string_rank" == 1.
str_rank.where(col('string_rank')==1).drop('string_rank').show()
我想在给定的 window 中获取连续出现频率最高的字符串,并将此值放在新的一行中。 (我正在使用 Pyspark)
这就是我的 table 的样子。
window label value
123 a 54
123 a 45
123 a 21
123 b 99
123 b 78
我正在做一些聚合,目前我正在按 window
和 label
分组。
sqlContext.sql(SELECT avg(value) as avgValue FROM table GROUP BY window, label)
这个 returns window = 123 and label = a 的平均值和 window = 123 and label = b
的平均值我想做的是按最常出现的字符串降序排列 label
,因此在我的 sql 语句中我可以执行 SELECT first(label) as majLabel, avg(value) as avgValue FROM table GROUP BY window
我正在尝试在 window 函数中执行此操作,但还没有完全实现。
group = ["window"]
w = (Window().partitionBy(*group))
df = spark.createDataFrame([['123','a','54'],['123','a','45'],['123','a','21'],['123','b','99'],['123','b','78'],],['window','label','value'])
定义一个正确的WindowSpec。
win_spec = window.partitionBy(['window','label']).orderBy(col('value').desc())
Returns window 分区内从 1 开始的序号 ['window','label'].
str_rank = df.withColumn('string_rank',row_number().over(win_spec))
str_rank.show()
这是 df 现在的样子:
Select window 和 "string_rank" == 1.
str_rank.where(col('string_rank')==1).drop('string_rank').show()