运行 Spark Scala 中列的 Cumulative/Iterative Costum 方法

Run a Cumulative/Iterative Costum Method on a Column in Spark Scala

嗨,我是 Spark/Scala 的新手,我一直在尝试 - 也就是失败,基于特定的递归公式在 spark 数据框中创建列:

这是伪代码。

someDf.col2[0] = 0

for i > 0
someDf.col2[i] = x * someDf.col1[i-1] + (1-x) * someDf.col2[i-1]

为了深入了解更多细节,这里是我的出发点: 此数据框是 dates 和个人 id 级别聚合的结果。

所有进一步的计算都必须针对该特定 id 进行,并且必须考虑上周发生的事情。

为了说明这一点,我将值简化为零和一,并删除了乘数 x1-x,并且我还将 col2 初始化为零。

var someDf = Seq(("2016-01-10 00:00:00.0","385608",0,0), 
         ("2016-01-17 00:00:00.0","385608",0,0),
         ("2016-01-24 00:00:00.0","385608",1,0),
         ("2016-01-31 00:00:00.0","385608",1,0),
         ("2016-02-07 00:00:00.0","385608",1,0),
         ("2016-02-14 00:00:00.0","385608",1,0),
         ("2016-01-17 00:00:00.0","105010",0,0),
         ("2016-01-24 00:00:00.0","105010",1,0),
         ("2016-01-31 00:00:00.0","105010",0,0),
         ("2016-02-07 00:00:00.0","105010",1,0)
        ).toDF("dates", "id", "col1","col2" )

someDf.show()
+--------------------+------+----+----+
|               dates|    id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608|   0|   0|
|2016-01-17 00:00:...|385608|   0|   0|
|2016-01-24 00:00:...|385608|   1|   0|
|2016-01-31 00:00:...|385608|   1|   0|
|2016-02-07 00:00:...|385608|   1|   0|
|2016-02-14 00:00:...|385608|   1|   0|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010|   0|   0|
|2016-01-24 00:00:...|105010|   1|   0|
|2016-01-31 00:00:...|105010|   0|   0|
|2016-02-07 00:00:...|105010|   1|   0|
+--------------------+------+----+----+

到目前为止我尝试过的与我想要的

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val date_id_window = Window.partitionBy("id").orderBy(asc("dates")) 

someDf.withColumn("col2", lag($"col1",1 ).over(date_id_window) + 
lag($"col2",1 ).over(date_id_window) ).show() 
+--------------------+------+----+----+ / +--------------------+
|               dates|    id|col1|col2| / | what_col2_should_be|
+--------------------+------+----+----+ / +--------------------+
|2016-01-17 00:00:...|105010|   0|null| / |                   0| 
|2016-01-24 00:00:...|105010|   1|   0| / |                   0|
|2016-01-31 00:00:...|105010|   0|   1| / |                   1|
|2016-02-07 00:00:...|105010|   1|   0| / |                   1|
+-------------------------------------+ / +--------------------+
|2016-01-10 00:00:...|385608|   0|null| / |                   0|
|2016-01-17 00:00:...|385608|   0|   0| / |                   0|
|2016-01-24 00:00:...|385608|   1|   0| / |                   0|
|2016-01-31 00:00:...|385608|   1|   1| / |                   1|
|2016-02-07 00:00:...|385608|   1|   1| / |                   2|
|2016-02-14 00:00:...|385608|   1|   1| / |                   3|
+--------------------+------+----+----+ / +--------------------+

有没有办法用 Spark 数据框来做到这一点,我见过多个累积类型的计算,但从来没有包括同一列,我认为问题在于没有考虑第 i-1 行的新计算值,而是使用始终为 0 的旧 i-1。

如有任何帮助,我们将不胜感激。

Dataset 应该可以正常工作:

val x = 0.1

case class Record(dates: String, id: String, col1: Int)

someDf.drop("col2").as[Record].groupByKey(_.id).flatMapGroups((_,  records) => {
  val sorted = records.toSeq.sortBy(_.dates)
  sorted.scanLeft((null: Record, 0.0)){
    case ((_, col2), record) => (record, x * record.col1 + (1 - x) * col2)
  }.tail
}).select($"_1.*", $"_2".alias("col2"))

您可以将 rowsBetween api 与您正在使用的 Window 函数一起使用,您应该会得到所需的输出

val date_id_window = Window.partitionBy("id").orderBy(asc("dates"))
someDf.withColumn("col2", sum(lag($"col1", 1).over(date_id_window)).over(date_id_window.rowsBetween(Long.MinValue, 0)))
  .withColumn("col2", when($"col2".isNull, lit(0)).otherwise($"col2"))
  .show()

给定输入 dataframe 作为

+--------------------+------+----+----+
|               dates|    id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608|   0|   0|
|2016-01-17 00:00:...|385608|   0|   0|
|2016-01-24 00:00:...|385608|   1|   0|
|2016-01-31 00:00:...|385608|   1|   0|
|2016-02-07 00:00:...|385608|   1|   0|
|2016-02-14 00:00:...|385608|   1|   0|
|2016-01-17 00:00:...|105010|   0|   0|
|2016-01-24 00:00:...|105010|   1|   0|
|2016-01-31 00:00:...|105010|   0|   0|
|2016-02-07 00:00:...|105010|   1|   0|
+--------------------+------+----+----+

在应用上述逻辑后,您应该有输出数据框

+--------------------+------+----+----+
|               dates|    id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010|   0|   0|
|2016-01-24 00:00:...|105010|   1|   0|
|2016-01-31 00:00:...|105010|   0|   1|
|2016-02-07 00:00:...|105010|   1|   1|
|2016-01-10 00:00:...|385608|   0|   0|
|2016-01-17 00:00:...|385608|   0|   0|
|2016-01-24 00:00:...|385608|   1|   0|
|2016-01-31 00:00:...|385608|   1|   1|
|2016-02-07 00:00:...|385608|   1|   2|
|2016-02-14 00:00:...|385608|   1|   3|
+--------------------+------+----+----+

希望回答对你有帮助

您应该对数据框应用转换,而不是将其视为 var。获得所需内容的一种方法是使用 Window 的 rowsBetween 对每个 window 分区内的行通过前一行(即行 -1):

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val window = Window.partitionBy("id").orderBy("dates").rowsBetween(Long.MinValue, -1)

val newDF = someDf.
  withColumn(
    "col2", sum($"col1").over(window)
  ).withColumn(
    "col2", when($"col2".isNull, 0).otherwise($"col2")
  ).orderBy("id", "dates")

newDF.show
+--------------------+------+----+----+
|               dates|    id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010|   0|   0|
|2016-01-24 00:00:...|105010|   1|   0|
|2016-01-31 00:00:...|105010|   0|   1|
|2016-02-07 00:00:...|105010|   1|   1|
|2016-01-10 00:00:...|385608|   0|   0|
|2016-01-17 00:00:...|385608|   0|   0|
|2016-01-24 00:00:...|385608|   1|   0|
|2016-01-31 00:00:...|385608|   1|   1|
|2016-02-07 00:00:...|385608|   1|   2|
|2016-02-14 00:00:...|385608|   1|   3|
+--------------------+------+----+----+