通过 Spark 数据框迭代获取日期月份

Get date months with iteration over Spark dataframe

我有一个问题案例要根据输入日期迭代过去 36 个月。目前使用 Scala,通过 DataFrame 我正在获取时间戳字段的最大值。例如:

val vGetDate = hc.read.format("filodb.spark").option("database","YYYYY").option("dataset","XXX").load().agg(max("inv_date").alias("max_date"))

例如日期为 2017-12-01 00:00:00

现在我需要迭代以获取过去 36 个月的日期。

请注意,我在 Cassandra 上使用 Spark 1.4 和 FiloDB。

如果您可以访问 Spark 1.5+,则可以使用 year and month 函数轻松完成此操作,但由于您只能访问 Spark 1.4,因此您必须在 UDF 中复制它们的功能,如下所示:

val year = udf {
  (s: String) =>
    java.sql.Timestamp.valueOf(s).getYear
}

val month = udf {
  (s: String) =>
    java.sql.Timestamp.valueOf(s).getMonth
}

def monthDiff(a: Column, b: Column): Column =
  (year(a) - year(b)) * 12 + (month(a) - month(b))

然后您可以使用带有 where 子句的 UDF 来过滤您的 DataFrame,就像我在这个例子中应该做的那样:

final case class Data(date: String)

val df = spark.createDataFrame(Seq(Data("2017-04-01 00:00:00")))

val since = lit("2018-01-01 00:00:00")

assert(df.where(monthDiff(since, $"date") < 36).count == 1)

assert(df.where(monthDiff(since, $"date") < 4).count == 0)

关于您需要迭代,您在使用 Spark DataFrame API 时通常想要采用的是 声明式方法,使用 groupBy 语句按您的键进行聚合。比如你原来的查询可以这样表示():

val vGetDate = 
  hc.read.format("filodb.spark").option("database","YYYYY").option("dataset","XXX").load().
  where(monthDiff(lit(startDate), "inv_date")).
  groupBy(concat(year("inv_date"), lit("-"), lpad(month("inv_date"), 2, "0"))).
  agg(max("inv_date").alias("max_date"))

在这里,您生成所有组的最新日期(由分组键定义为格式为 yyyy-MM 的年份和月份,其 "inv_date" 与一些 startDate.