将火花数据框中的多行合并为一行
Merging multiple rows in a spark dataframe into a single row
我有一个包含 2 列的数据框:时间戳、值
时间戳是自纪元以来的时间,值是浮点值。
我想按最小值将行合并为平均值。
这意味着我想获取时间戳来自同一圆分钟(自纪元以来 60 秒间隔)的所有行,并将它们合并为一行,其中值列将是所有值的平均值。
举个例子,假设我的数据框如下所示:
timestamp value
--------- -----
1441637160 10.0
1441637170 20.0
1441637180 30.0
1441637210 40.0
1441637220 10.0
1441637230 0.0
前 4 行是同一分钟的一部分 (1441637160 % 60 == 0, 1441637160 + 60 == 1441637220)
最后两行是另一分钟的一部分。
我想合并同一分钟的所有行。得到如下所示的结果:
timestamp value
--------- -----
1441637160 25.0 (since (10+20+30+40)/4 = 25)
1441637220 5.0 (since (10+0)/2 = 5)
最好的方法是什么?
首先将时间戳映射到分钟桶,然后使用groupByKey计算平均值。例如:
rdd.map(x=>{val round = x._1%60; (x._1-round, x._2);})
.groupByKey
.map(x=>(x._1, (x._2.sum.toDouble/x._2.size)))
.collect()
您可以简单地分组和聚合。数据为:
val df = sc.parallelize(Seq(
(1441637160, 10.0),
(1441637170, 20.0),
(1441637180, 30.0),
(1441637210, 40.0),
(1441637220, 10.0),
(1441637230, 0.0))).toDF("timestamp", "value")
导入所需的函数和类:
import org.apache.spark.sql.functions.{lit, floor}
import org.apache.spark.sql.types.IntegerType
创建间隔列:
val tsGroup = (floor($"timestamp" / lit(60)) * lit(60))
.cast(IntegerType)
.alias("timestamp")
并用它来执行聚合:
df.groupBy(tsGroup).agg(mean($"value").alias("value")).show
// +----------+-----+
// | timestamp|value|
// +----------+-----+
// |1441637160| 25.0|
// |1441637220| 5.0|
// +----------+-----+
我有一个包含 2 列的数据框:时间戳、值 时间戳是自纪元以来的时间,值是浮点值。 我想按最小值将行合并为平均值。 这意味着我想获取时间戳来自同一圆分钟(自纪元以来 60 秒间隔)的所有行,并将它们合并为一行,其中值列将是所有值的平均值。
举个例子,假设我的数据框如下所示:
timestamp value
--------- -----
1441637160 10.0
1441637170 20.0
1441637180 30.0
1441637210 40.0
1441637220 10.0
1441637230 0.0
前 4 行是同一分钟的一部分 (1441637160 % 60 == 0, 1441637160 + 60 == 1441637220) 最后两行是另一分钟的一部分。 我想合并同一分钟的所有行。得到如下所示的结果:
timestamp value
--------- -----
1441637160 25.0 (since (10+20+30+40)/4 = 25)
1441637220 5.0 (since (10+0)/2 = 5)
最好的方法是什么?
首先将时间戳映射到分钟桶,然后使用groupByKey计算平均值。例如:
rdd.map(x=>{val round = x._1%60; (x._1-round, x._2);})
.groupByKey
.map(x=>(x._1, (x._2.sum.toDouble/x._2.size)))
.collect()
您可以简单地分组和聚合。数据为:
val df = sc.parallelize(Seq(
(1441637160, 10.0),
(1441637170, 20.0),
(1441637180, 30.0),
(1441637210, 40.0),
(1441637220, 10.0),
(1441637230, 0.0))).toDF("timestamp", "value")
导入所需的函数和类:
import org.apache.spark.sql.functions.{lit, floor}
import org.apache.spark.sql.types.IntegerType
创建间隔列:
val tsGroup = (floor($"timestamp" / lit(60)) * lit(60))
.cast(IntegerType)
.alias("timestamp")
并用它来执行聚合:
df.groupBy(tsGroup).agg(mean($"value").alias("value")).show
// +----------+-----+
// | timestamp|value|
// +----------+-----+
// |1441637160| 25.0|
// |1441637220| 5.0|
// +----------+-----+