Spark 1.6,DataFrame:通过添加行来填补空白
Spark 1.6, DataFrame: fill gaps by adding rows
我有一个如下所示的 DataFrame:
+-----+---+-----+
| id |ind| freq|
+-----+---+-----+
|user1| 1| 5|
|user2| 0| 13|
|user2| 2| 4|
|user3| 2| 7|
|user3| 3| 45|
+-----+---+-----+
列 ind
具有介于 0 和 3 之间的整数值。
我想为每个用户添加缺失的 ind
值,同时用默认值(例如 0)填充 freq
列,因此输出 DataFrame 如下所示:
+-----+---+-----+
| id |ind| freq|
+-----+---+-----+
|user1| 0| 0|
|user1| 1| 5|
|user1| 2| 0|
|user1| 3| 0|
|user2| 0| 13|
|user2| 1| 0|
|user2| 2| 4|
|user2| 3| 0|
|user3| 0| 0|
|user3| 1| 0|
|user3| 2| 7|
|user3| 3| 45|
+-----+---+-----+
最有效的方法是什么?
这不是人们能想出的最佳解决方案,但从我的头脑来看,它可以完成工作:
import org.apache.spark.sql.Row
val df = sc.parallelize(List(("user1",1,5),("user2", 0, 13),("user2", 2, 4),("user3", 2, 7),("user3", 3, 45))).toDF("id","ind","freq")
df.show
// +-----+---+----+
// | id|ind|freq|
// +-----+---+----+
// |user1| 1| 5|
// |user2| 0| 13|
// |user2| 2| 4|
// |user3| 2| 7|
// |user3| 3| 45|
// +-----+---+----+
val df2 = df.groupBy('id).pivot("ind").max("freq").na.fill(0)
df2.show
// +-----+---+---+---+---+
// | id| 0| 1| 2| 3|
// +-----+---+---+---+---+
// |user1| 0| 5| 0| 0|
// |user2| 13| 0| 4| 0|
// |user3| 0| 0| 7| 45|
// +-----+---+---+---+---+
val cols = df2.columns
val df3 = df2.rdd.map {
case r : Row =>
val id = r.getAs[String]("id")
cols.map(ind => (id,ind,r.getAs[Integer](ind)))
}.flatMap(_.toSeq).filter(_._2 != "id").toDF("id","ind","freq")
df3.show
// +-----+---+----+
// | id|ind|freq|
// +-----+---+----+
// |user1| 0| 0|
// |user1| 1| 5|
// |user1| 2| 0|
// |user1| 3| 0|
// |user2| 0| 13|
// |user2| 1| 0|
// |user2| 2| 4|
// |user2| 3| 0|
// |user3| 0| 0|
// |user3| 1| 0|
// |user3| 2| 7|
// |user3| 3| 45|
// +-----+---+----+
我正在使用 GroupeData
中的 pivot
函数,然后按列展平它。 (Spark 1.6+)
PS: 这个解决方案没有优化,我有很多缺点。即:大量索引、计算成本等
我只是 运行 解决了这个 "gap" 问题,我想出的解决方案很幼稚,所以它可能效率不高,但我认为它非常简单。
基本上是为了穷尽 (id, ind)
对的所有组合,从原始 DF I select id
的不同值和 select [=12 的不同值=],然后交叉连接这两个结果得到所有的组合。之后只需将结果连接回原始 DF 并将所有 NA 填充为 0.
我有一个如下所示的 DataFrame:
+-----+---+-----+
| id |ind| freq|
+-----+---+-----+
|user1| 1| 5|
|user2| 0| 13|
|user2| 2| 4|
|user3| 2| 7|
|user3| 3| 45|
+-----+---+-----+
列 ind
具有介于 0 和 3 之间的整数值。
我想为每个用户添加缺失的 ind
值,同时用默认值(例如 0)填充 freq
列,因此输出 DataFrame 如下所示:
+-----+---+-----+
| id |ind| freq|
+-----+---+-----+
|user1| 0| 0|
|user1| 1| 5|
|user1| 2| 0|
|user1| 3| 0|
|user2| 0| 13|
|user2| 1| 0|
|user2| 2| 4|
|user2| 3| 0|
|user3| 0| 0|
|user3| 1| 0|
|user3| 2| 7|
|user3| 3| 45|
+-----+---+-----+
最有效的方法是什么?
这不是人们能想出的最佳解决方案,但从我的头脑来看,它可以完成工作:
import org.apache.spark.sql.Row
val df = sc.parallelize(List(("user1",1,5),("user2", 0, 13),("user2", 2, 4),("user3", 2, 7),("user3", 3, 45))).toDF("id","ind","freq")
df.show
// +-----+---+----+
// | id|ind|freq|
// +-----+---+----+
// |user1| 1| 5|
// |user2| 0| 13|
// |user2| 2| 4|
// |user3| 2| 7|
// |user3| 3| 45|
// +-----+---+----+
val df2 = df.groupBy('id).pivot("ind").max("freq").na.fill(0)
df2.show
// +-----+---+---+---+---+
// | id| 0| 1| 2| 3|
// +-----+---+---+---+---+
// |user1| 0| 5| 0| 0|
// |user2| 13| 0| 4| 0|
// |user3| 0| 0| 7| 45|
// +-----+---+---+---+---+
val cols = df2.columns
val df3 = df2.rdd.map {
case r : Row =>
val id = r.getAs[String]("id")
cols.map(ind => (id,ind,r.getAs[Integer](ind)))
}.flatMap(_.toSeq).filter(_._2 != "id").toDF("id","ind","freq")
df3.show
// +-----+---+----+
// | id|ind|freq|
// +-----+---+----+
// |user1| 0| 0|
// |user1| 1| 5|
// |user1| 2| 0|
// |user1| 3| 0|
// |user2| 0| 13|
// |user2| 1| 0|
// |user2| 2| 4|
// |user2| 3| 0|
// |user3| 0| 0|
// |user3| 1| 0|
// |user3| 2| 7|
// |user3| 3| 45|
// +-----+---+----+
我正在使用 GroupeData
中的 pivot
函数,然后按列展平它。 (Spark 1.6+)
PS: 这个解决方案没有优化,我有很多缺点。即:大量索引、计算成本等
我只是 运行 解决了这个 "gap" 问题,我想出的解决方案很幼稚,所以它可能效率不高,但我认为它非常简单。
基本上是为了穷尽 (id, ind)
对的所有组合,从原始 DF I select id
的不同值和 select [=12 的不同值=],然后交叉连接这两个结果得到所有的组合。之后只需将结果连接回原始 DF 并将所有 NA 填充为 0.