如何在 Spark 中收集和处理列式数据
How to collect and process column-wise data in Spark
我有一个包含 7 天 24 小时数据的数据框,所以它有 144 列。
id d1h1 d1h2 d1h3 ..... d7h24
aaa 21 24 8 ..... 14
bbb 16 12 2 ..... 4
ccc 21 2 7 ..... 6
我想做的是找出每天的最大 3 个值:
id d1 d2 d3 .... d7
aaa [22,2,2] [17,2,2] [21,8,3] [32,11,2]
bbb [32,22,12] [47,22,2] [31,14,3] [32,11,2]
ccc [12,7,4] [28,14,7] [11,2,1] [19,14,7]
定义模式:
val p = "^(d[1-7])h[0-9]{1,2}$".r
分组列:
import org.apache.spark.sql.functions._
val cols = df.columns.tail
.groupBy { case p(d) => d }
.map { case (c, cs) => {
val sorted = sort_array(array(cs map col: _*), false)
array(sorted(0), sorted(1), sorted(2)).as(c)
}}
和select:
df.select($"id" +: cols.toSeq: _*)
import org.apache.spark.sql.functions._
var df = ...
val first3 = udf((list : Seq[Double]) => list.slice(0,3))
for (i <- 1 until 7) {
val columns = (1 until 24).map(x=> "d"+i+"h"+x)
df = df
.withColumn("d"+i, first3(sort_array(array(columns.head, columns.tail :_*), false)))
.drop(columns :_*)
}
这应该能满足您的需求。事实上,对于每一天,我都会将 24 小时聚合到一个数组列中,我按 desc 顺序排序,并从中 select 前 3 个元素。
我有一个包含 7 天 24 小时数据的数据框,所以它有 144 列。
id d1h1 d1h2 d1h3 ..... d7h24
aaa 21 24 8 ..... 14
bbb 16 12 2 ..... 4
ccc 21 2 7 ..... 6
我想做的是找出每天的最大 3 个值:
id d1 d2 d3 .... d7
aaa [22,2,2] [17,2,2] [21,8,3] [32,11,2]
bbb [32,22,12] [47,22,2] [31,14,3] [32,11,2]
ccc [12,7,4] [28,14,7] [11,2,1] [19,14,7]
定义模式:
val p = "^(d[1-7])h[0-9]{1,2}$".r
分组列:
import org.apache.spark.sql.functions._
val cols = df.columns.tail
.groupBy { case p(d) => d }
.map { case (c, cs) => {
val sorted = sort_array(array(cs map col: _*), false)
array(sorted(0), sorted(1), sorted(2)).as(c)
}}
和select:
df.select($"id" +: cols.toSeq: _*)
import org.apache.spark.sql.functions._
var df = ...
val first3 = udf((list : Seq[Double]) => list.slice(0,3))
for (i <- 1 until 7) {
val columns = (1 until 24).map(x=> "d"+i+"h"+x)
df = df
.withColumn("d"+i, first3(sort_array(array(columns.head, columns.tail :_*), false)))
.drop(columns :_*)
}
这应该能满足您的需求。事实上,对于每一天,我都会将 24 小时聚合到一个数组列中,我按 desc 顺序排序,并从中 select 前 3 个元素。