Spark 中当前行上方的最后一个唯一条目

Last unique entries above current row in Spark

我有一个包含以下数据的 Spark 数据框:

val df = sc.parallelize(Seq(
  (1, "A", "2022-01-01", 30, 0), 
  (1, "A", "2022-01-02", 20, 30),
  (1, "B", "2022-01-03", 50, 20),
  (1, "A", "2022-01-04", 10, 70),
  (1, "B", "2022-01-05", 30, 60), 
  (1, "A", "2022-01-06", 0,  40), 
  (1, "C", "2022-01-07", 100,30), 
  (2, "D", "2022-01-08", 5, 0)
)).toDF("id", "event", "eventTimestamp", "amount", "expected")

display(df)
id event eventTimestamp amount expected
1 "A" "2022-01-01" 30 0
1 "A" "2022-01-02" 20 30
1 "B" "2022-01-03" 50 20
1 "A" "2022-01-04" 10 70
1 "B" "2022-01-05" 30 60
1 "A" "2022-01-06" 0 40
1 "C" "2022-01-07" 100 30
2 "D" "2022-01-08" 5 0

我想为每一行找到以下内容:每个 ID 和每个 unique 事件的所有最后条目(当前行上方)的总和。期望的结果在“预期”栏中。

例如对于订单“C”,我想获得“A”和“B”的最新金额:30 + 0 = 30

我尝试了以下查询,但它会汇总所有先前订单的金额,包括重复订单(我不确定,是否可以对总和应用过滤器以仅采用不同的值):

val days = (x:Int) => x * 86400
val idWindow = Window.partitionBy("id").orderBy(col("eventTimestamp")
   .cast("timestamp").cast("long"))
   .rangeBetween(Window.unboundedPreceding, -days(1))

val res = df.withColumn("totalAmount", sum($"amount").over(idWindow))

请注意,rangeBetween 功能对我的用例很重要,应该保留。

诀窍是将金额转换为 (id, event) 对内的差异,这样您就可以在下一步中计算移动总和。该移动总和维护每个唯一事件的最新金额。

df
  .withColumn("diff", coalesce($"amount" - lag($"amount", 1).over(wIdEvent), $"amount")).
  .withColumn("sum", sum($"diff").over(wId)).
  .withColumn("final", coalesce(lag($"sum", 1).over(wId), lit(0))).
  .orderBy($"eventTimestamp").show

+---+-----+--------------+------+--------+----+---+-----+
| id|event|eventTimestamp|amount|expected|diff|sum|final|
+---+-----+--------------+------+--------+----+---+-----+
|  1|    A|    2022-01-01|    30|       0|  30| 30|    0|
|  1|    A|    2022-01-02|    20|      30| -10| 20|   30|
|  1|    B|    2022-01-03|    50|      20|  50| 70|   20|
|  1|    A|    2022-01-04|    10|      70| -10| 60|   70|
|  1|    B|    2022-01-05|    30|      60| -20| 40|   60|
|  1|    A|    2022-01-06|     0|      40| -10| 30|   40|
|  1|    C|    2022-01-07|   100|      30| 100|130|   30|
|  2|    D|    2022-01-08|     5|       0|   5|  5|    0|
+---+-----+--------------+------+--------+----+---+-----+