Spark 中当前行上方的最后一个唯一条目
Last unique entries above current row in Spark
我有一个包含以下数据的 Spark 数据框:
val df = sc.parallelize(Seq(
(1, "A", "2022-01-01", 30, 0),
(1, "A", "2022-01-02", 20, 30),
(1, "B", "2022-01-03", 50, 20),
(1, "A", "2022-01-04", 10, 70),
(1, "B", "2022-01-05", 30, 60),
(1, "A", "2022-01-06", 0, 40),
(1, "C", "2022-01-07", 100,30),
(2, "D", "2022-01-08", 5, 0)
)).toDF("id", "event", "eventTimestamp", "amount", "expected")
display(df)
id
event
eventTimestamp
amount
expected
1
"A"
"2022-01-01"
30
0
1
"A"
"2022-01-02"
20
30
1
"B"
"2022-01-03"
50
20
1
"A"
"2022-01-04"
10
70
1
"B"
"2022-01-05"
30
60
1
"A"
"2022-01-06"
0
40
1
"C"
"2022-01-07"
100
30
2
"D"
"2022-01-08"
5
0
我想为每一行找到以下内容:每个 ID 和每个 unique 事件的所有最后条目(当前行上方)的总和。期望的结果在“预期”栏中。
例如对于订单“C”,我想获得“A”和“B”的最新金额:30 + 0 = 30
我尝试了以下查询,但它会汇总所有先前订单的金额,包括重复订单(我不确定,是否可以对总和应用过滤器以仅采用不同的值):
val days = (x:Int) => x * 86400
val idWindow = Window.partitionBy("id").orderBy(col("eventTimestamp")
.cast("timestamp").cast("long"))
.rangeBetween(Window.unboundedPreceding, -days(1))
val res = df.withColumn("totalAmount", sum($"amount").over(idWindow))
请注意,rangeBetween 功能对我的用例很重要,应该保留。
诀窍是将金额转换为 (id, event) 对内的差异,这样您就可以在下一步中计算移动总和。该移动总和维护每个唯一事件的最新金额。
df
.withColumn("diff", coalesce($"amount" - lag($"amount", 1).over(wIdEvent), $"amount")).
.withColumn("sum", sum($"diff").over(wId)).
.withColumn("final", coalesce(lag($"sum", 1).over(wId), lit(0))).
.orderBy($"eventTimestamp").show
+---+-----+--------------+------+--------+----+---+-----+
| id|event|eventTimestamp|amount|expected|diff|sum|final|
+---+-----+--------------+------+--------+----+---+-----+
| 1| A| 2022-01-01| 30| 0| 30| 30| 0|
| 1| A| 2022-01-02| 20| 30| -10| 20| 30|
| 1| B| 2022-01-03| 50| 20| 50| 70| 20|
| 1| A| 2022-01-04| 10| 70| -10| 60| 70|
| 1| B| 2022-01-05| 30| 60| -20| 40| 60|
| 1| A| 2022-01-06| 0| 40| -10| 30| 40|
| 1| C| 2022-01-07| 100| 30| 100|130| 30|
| 2| D| 2022-01-08| 5| 0| 5| 5| 0|
+---+-----+--------------+------+--------+----+---+-----+
我有一个包含以下数据的 Spark 数据框:
val df = sc.parallelize(Seq(
(1, "A", "2022-01-01", 30, 0),
(1, "A", "2022-01-02", 20, 30),
(1, "B", "2022-01-03", 50, 20),
(1, "A", "2022-01-04", 10, 70),
(1, "B", "2022-01-05", 30, 60),
(1, "A", "2022-01-06", 0, 40),
(1, "C", "2022-01-07", 100,30),
(2, "D", "2022-01-08", 5, 0)
)).toDF("id", "event", "eventTimestamp", "amount", "expected")
display(df)
id | event | eventTimestamp | amount | expected |
---|---|---|---|---|
1 | "A" | "2022-01-01" | 30 | 0 |
1 | "A" | "2022-01-02" | 20 | 30 |
1 | "B" | "2022-01-03" | 50 | 20 |
1 | "A" | "2022-01-04" | 10 | 70 |
1 | "B" | "2022-01-05" | 30 | 60 |
1 | "A" | "2022-01-06" | 0 | 40 |
1 | "C" | "2022-01-07" | 100 | 30 |
2 | "D" | "2022-01-08" | 5 | 0 |
我想为每一行找到以下内容:每个 ID 和每个 unique 事件的所有最后条目(当前行上方)的总和。期望的结果在“预期”栏中。
例如对于订单“C”,我想获得“A”和“B”的最新金额:30 + 0 = 30
我尝试了以下查询,但它会汇总所有先前订单的金额,包括重复订单(我不确定,是否可以对总和应用过滤器以仅采用不同的值):
val days = (x:Int) => x * 86400
val idWindow = Window.partitionBy("id").orderBy(col("eventTimestamp")
.cast("timestamp").cast("long"))
.rangeBetween(Window.unboundedPreceding, -days(1))
val res = df.withColumn("totalAmount", sum($"amount").over(idWindow))
请注意,rangeBetween 功能对我的用例很重要,应该保留。
诀窍是将金额转换为 (id, event) 对内的差异,这样您就可以在下一步中计算移动总和。该移动总和维护每个唯一事件的最新金额。
df
.withColumn("diff", coalesce($"amount" - lag($"amount", 1).over(wIdEvent), $"amount")).
.withColumn("sum", sum($"diff").over(wId)).
.withColumn("final", coalesce(lag($"sum", 1).over(wId), lit(0))).
.orderBy($"eventTimestamp").show
+---+-----+--------------+------+--------+----+---+-----+
| id|event|eventTimestamp|amount|expected|diff|sum|final|
+---+-----+--------------+------+--------+----+---+-----+
| 1| A| 2022-01-01| 30| 0| 30| 30| 0|
| 1| A| 2022-01-02| 20| 30| -10| 20| 30|
| 1| B| 2022-01-03| 50| 20| 50| 70| 20|
| 1| A| 2022-01-04| 10| 70| -10| 60| 70|
| 1| B| 2022-01-05| 30| 60| -20| 40| 60|
| 1| A| 2022-01-06| 0| 40| -10| 30| 40|
| 1| C| 2022-01-07| 100| 30| 100|130| 30|
| 2| D| 2022-01-08| 5| 0| 5| 5| 0|
+---+-----+--------------+------+--------+----+---+-----+