使用日期的空数据转换数据集

Transform dataset with empty data for dates

我有一个包含日期、帐户 ID 和值的数据集。我想将数据集转换为一个新的数据集,如果 accountid 在特定日期不存在,则添加一个值为 0 的 accountid date.Is 这可能

    val df = sc.parallelize(Seq(("2018-01-01", 100.5,"id1"),
  ("2018-01-02", 120.6,"id1"),
  ("2018-01-03", 450.2,"id2")
  )).toDF("date", "val","accountid")
    +----------+-----+---------+
|      date|  val|accountid|
+----------+-----+---------+
|2018-01-01|100.5|      id1|
|2018-01-02|120.6|      id1|
|2018-01-03|450.2|      id2|
+----------+-----+---------+

我想将这个数据集转换成这种格式

+----------+-----+---------+
|      date|  val|accountid|
+----------+-----+---------+
|2018-01-01|100.5|      id1|
|2018-01-01|  0.0|      id2|
|2018-01-02|120.6|      id1|
|2018-01-02|  0.0|      id2|
|2018-01-03|450.2|      id2|
|2018-01-03|0.0  |      id1|
+----------+-----+---------+

您可以创建参考

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

val Row(minTs: Long, maxTs: Long) = df
  .select(to_date($"date").cast("timestamp").cast("bigint") as "date")
  .select(min($"date"), max($"date")).first

val by =  60 * 60 * 24

val ref = spark
  .range(minTs, maxTs + by, by)
  .select($"id".cast("timestamp").cast("date").cast("string").as("date"))
  .crossJoin(df.select("accountid").distinct)

并与输入数据进行外部联接:

ref.join(df, Seq("date", "accountid"), "leftouter").na.fill(0.0).show
// +----------+---------+-----+      
// |      date|accountid|  val|
// +----------+---------+-----+
// |2018-01-03|      id1|  0.0|
// |2018-01-01|      id1|100.5|
// |2018-01-02|      id2|  0.0|
// |2018-01-02|      id1|120.6|
// |2018-01-03|      id2|450.2|
// |2018-01-01|      id2|  0.0|
// +----------+---------+-----+

概念取自 by user6910411

您只需使用 udf 函数即可满足您的要求。

但在此之前,您必须获取 accountids 的完整集合广播[=38] =] 用于 udf 函数。

udf函数返回的数组将被展开并且最终select 列

import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)

val broadCastedIdList = sc.broadcast(idList)

def populateUdf = udf((date: String, value: Double, accountid: String)=> Array(accounts(date, value, accountid)) ++ broadCastedIdList.value.filterNot(_ == accountid).map(accounts(date, 0.0, _)))

df.select(populateUdf(col("date"), col("val"), col("accountid")).as("struct"))
    .withColumn("struct", explode(col("struct")))
    .select(col("struct.date"), col("struct.value").as("val"), col("struct.accountid"))
  .show(false)

当然你需要 case class

case class accounts(date:String, value:Double, accountid:String)

哪个应该给你

+----------+-----+---------+
|date      |val  |accountid|
+----------+-----+---------+
|2018-01-01|100.5|id1      |
|2018-01-01|0.0  |id2      |
|2018-01-02|120.6|id1      |
|2018-01-02|0.0  |id2      |
|2018-01-03|450.2|id2      |
|2018-01-03|0.0  |id1      |
+----------+-----+---------+

注意:在class的情况下使用value关键字,因为保留的标识符名称不能用作变量名称