使用日期的空数据转换数据集
Transform dataset with empty data for dates
我有一个包含日期、帐户 ID 和值的数据集。我想将数据集转换为一个新的数据集,如果 accountid 在特定日期不存在,则添加一个值为 0 的 accountid date.Is 这可能
val df = sc.parallelize(Seq(("2018-01-01", 100.5,"id1"),
("2018-01-02", 120.6,"id1"),
("2018-01-03", 450.2,"id2")
)).toDF("date", "val","accountid")
+----------+-----+---------+
| date| val|accountid|
+----------+-----+---------+
|2018-01-01|100.5| id1|
|2018-01-02|120.6| id1|
|2018-01-03|450.2| id2|
+----------+-----+---------+
我想将这个数据集转换成这种格式
+----------+-----+---------+
| date| val|accountid|
+----------+-----+---------+
|2018-01-01|100.5| id1|
|2018-01-01| 0.0| id2|
|2018-01-02|120.6| id1|
|2018-01-02| 0.0| id2|
|2018-01-03|450.2| id2|
|2018-01-03|0.0 | id1|
+----------+-----+---------+
您可以创建参考
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
val Row(minTs: Long, maxTs: Long) = df
.select(to_date($"date").cast("timestamp").cast("bigint") as "date")
.select(min($"date"), max($"date")).first
val by = 60 * 60 * 24
val ref = spark
.range(minTs, maxTs + by, by)
.select($"id".cast("timestamp").cast("date").cast("string").as("date"))
.crossJoin(df.select("accountid").distinct)
并与输入数据进行外部联接:
ref.join(df, Seq("date", "accountid"), "leftouter").na.fill(0.0).show
// +----------+---------+-----+
// | date|accountid| val|
// +----------+---------+-----+
// |2018-01-03| id1| 0.0|
// |2018-01-01| id1|100.5|
// |2018-01-02| id2| 0.0|
// |2018-01-02| id1|120.6|
// |2018-01-03| id2|450.2|
// |2018-01-01| id2| 0.0|
// +----------+---------+-----+
概念取自 by user6910411。
您只需使用 udf
函数即可满足您的要求。
但在此之前,您必须获取 accountids 的完整集合 并广播[=38] =] 用于 udf
函数。
从udf
函数返回的数组将被展开并且最终select 列 。
import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)
val broadCastedIdList = sc.broadcast(idList)
def populateUdf = udf((date: String, value: Double, accountid: String)=> Array(accounts(date, value, accountid)) ++ broadCastedIdList.value.filterNot(_ == accountid).map(accounts(date, 0.0, _)))
df.select(populateUdf(col("date"), col("val"), col("accountid")).as("struct"))
.withColumn("struct", explode(col("struct")))
.select(col("struct.date"), col("struct.value").as("val"), col("struct.accountid"))
.show(false)
当然你需要 case class
case class accounts(date:String, value:Double, accountid:String)
哪个应该给你
+----------+-----+---------+
|date |val |accountid|
+----------+-----+---------+
|2018-01-01|100.5|id1 |
|2018-01-01|0.0 |id2 |
|2018-01-02|120.6|id1 |
|2018-01-02|0.0 |id2 |
|2018-01-03|450.2|id2 |
|2018-01-03|0.0 |id1 |
+----------+-----+---------+
注意:在class的情况下使用value关键字,因为保留的标识符名称不能用作变量名称
我有一个包含日期、帐户 ID 和值的数据集。我想将数据集转换为一个新的数据集,如果 accountid 在特定日期不存在,则添加一个值为 0 的 accountid date.Is 这可能
val df = sc.parallelize(Seq(("2018-01-01", 100.5,"id1"),
("2018-01-02", 120.6,"id1"),
("2018-01-03", 450.2,"id2")
)).toDF("date", "val","accountid")
+----------+-----+---------+
| date| val|accountid|
+----------+-----+---------+
|2018-01-01|100.5| id1|
|2018-01-02|120.6| id1|
|2018-01-03|450.2| id2|
+----------+-----+---------+
我想将这个数据集转换成这种格式
+----------+-----+---------+
| date| val|accountid|
+----------+-----+---------+
|2018-01-01|100.5| id1|
|2018-01-01| 0.0| id2|
|2018-01-02|120.6| id1|
|2018-01-02| 0.0| id2|
|2018-01-03|450.2| id2|
|2018-01-03|0.0 | id1|
+----------+-----+---------+
您可以创建参考
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
val Row(minTs: Long, maxTs: Long) = df
.select(to_date($"date").cast("timestamp").cast("bigint") as "date")
.select(min($"date"), max($"date")).first
val by = 60 * 60 * 24
val ref = spark
.range(minTs, maxTs + by, by)
.select($"id".cast("timestamp").cast("date").cast("string").as("date"))
.crossJoin(df.select("accountid").distinct)
并与输入数据进行外部联接:
ref.join(df, Seq("date", "accountid"), "leftouter").na.fill(0.0).show
// +----------+---------+-----+
// | date|accountid| val|
// +----------+---------+-----+
// |2018-01-03| id1| 0.0|
// |2018-01-01| id1|100.5|
// |2018-01-02| id2| 0.0|
// |2018-01-02| id1|120.6|
// |2018-01-03| id2|450.2|
// |2018-01-01| id2| 0.0|
// +----------+---------+-----+
概念取自
您只需使用 udf
函数即可满足您的要求。
但在此之前,您必须获取 accountids 的完整集合 并广播[=38] =] 用于 udf
函数。
从udf
函数返回的数组将被展开并且最终select 列 。
import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)
val broadCastedIdList = sc.broadcast(idList)
def populateUdf = udf((date: String, value: Double, accountid: String)=> Array(accounts(date, value, accountid)) ++ broadCastedIdList.value.filterNot(_ == accountid).map(accounts(date, 0.0, _)))
df.select(populateUdf(col("date"), col("val"), col("accountid")).as("struct"))
.withColumn("struct", explode(col("struct")))
.select(col("struct.date"), col("struct.value").as("val"), col("struct.accountid"))
.show(false)
当然你需要 case class
case class accounts(date:String, value:Double, accountid:String)
哪个应该给你
+----------+-----+---------+
|date |val |accountid|
+----------+-----+---------+
|2018-01-01|100.5|id1 |
|2018-01-01|0.0 |id2 |
|2018-01-02|120.6|id1 |
|2018-01-02|0.0 |id2 |
|2018-01-03|450.2|id2 |
|2018-01-03|0.0 |id1 |
+----------+-----+---------+
注意:在class的情况下使用value关键字,因为保留的标识符名称不能用作变量名称