Spark Dataframe - 窗口函数 - 插入和更新输出的滞后和超前
Spark Dataframe - Windowing Function - Lag & Lead for Insert & Update output
我需要使用窗口函数 Lag 和 Lead 对数据帧执行以下操作。
对于每个Key,我需要在最终输出中执行以下插入和更新
插入条件:
1. 默认情况下, LAYER_NO=0 ,需要在输出中写入。
2.如果COL1,COL2,COL3的值有任何变化,相对于它的珍贵记录,则需要在输出中写入该记录。
示例:key_1 且 layer_no=2,COL3
中的值从 400 变为 600
更新条件:
1. 如果 COL1,COL2,COL3 的值相对于其之前的记录没有变化,但 "DEPART column" 有变化,则需要在输出中更新此值。
示例:key_1 与 layer_no=3,COL1、COL2、COL3 没有变化,但 DEPART 列中的值发生变化为 "xyz",所以这需要将在输出中更新。
2. 即使是 LAYER_NO 也应该顺序更新,在插入 layer_no=0
的记录后
val inputDF = values.toDF("KEY","LAYER_NO","COl1","COl2","COl3","DEPART")
inputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COL1|COL2|COL3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|->default write
|key_1| 1| 200| 300| 400| abc|
|key_1| 2| 200| 300| 600| uil|--->change in col3,so write
|key_1| 2| 200| 300| 600| uil|
|key_1| 3| 200| 300| 600| xyz|--->change in col4,so update
|key_2| 0| 500| 700| 900| prq|->default write
|key_2| 1| 888| 555| 900| tep|--->change in col1 & col 2,so write
|key_3| 0| 111| 222| 333| lgh|->default write
|key_3| 1| 084| 222| 333| lgh|--->change in col1,so write
|key_3| 2| 084| 222| 333| rrr|--->change in col4,so update
+-----+--------+----+----+----+------+
预期输出:
outputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+
我们需要定义两个 Window
才能达到您的预期输出。一个用于检查 DEPART
列中的变化,第二个用于检查 COL1
与 COL3
.
之和的差异
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")
然后我们只需用正确的值替换 DEPART
列中的值,并将数据过滤到滞后总和与当前列总和不同的行(以及 LAYER_NO === 0
的行) .最后,我们用排名替换 LAYER_NO
。
inputDF.withColumn("DEPART", last("DEPART").over(w_col))
.withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
.withColumn("lag_sum", lag($"row_sum",1).over(w_key))
.filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
.withColumn("LAYER_NO", rank.over(w_key)-1)
.drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+
我需要使用窗口函数 Lag 和 Lead 对数据帧执行以下操作。
对于每个Key,我需要在最终输出中执行以下插入和更新
插入条件:
1. 默认情况下, LAYER_NO=0 ,需要在输出中写入。
2.如果COL1,COL2,COL3的值有任何变化,相对于它的珍贵记录,则需要在输出中写入该记录。
示例:key_1 且 layer_no=2,COL3
中的值从 400 变为 600更新条件:
1. 如果 COL1,COL2,COL3 的值相对于其之前的记录没有变化,但 "DEPART column" 有变化,则需要在输出中更新此值。
示例:key_1 与 layer_no=3,COL1、COL2、COL3 没有变化,但 DEPART 列中的值发生变化为 "xyz",所以这需要将在输出中更新。
2. 即使是 LAYER_NO 也应该顺序更新,在插入 layer_no=0
val inputDF = values.toDF("KEY","LAYER_NO","COl1","COl2","COl3","DEPART")
inputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COL1|COL2|COL3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|->default write
|key_1| 1| 200| 300| 400| abc|
|key_1| 2| 200| 300| 600| uil|--->change in col3,so write
|key_1| 2| 200| 300| 600| uil|
|key_1| 3| 200| 300| 600| xyz|--->change in col4,so update
|key_2| 0| 500| 700| 900| prq|->default write
|key_2| 1| 888| 555| 900| tep|--->change in col1 & col 2,so write
|key_3| 0| 111| 222| 333| lgh|->default write
|key_3| 1| 084| 222| 333| lgh|--->change in col1,so write
|key_3| 2| 084| 222| 333| rrr|--->change in col4,so update
+-----+--------+----+----+----+------+
预期输出:
outputDF.show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+
我们需要定义两个 Window
才能达到您的预期输出。一个用于检查 DEPART
列中的变化,第二个用于检查 COL1
与 COL3
.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")
然后我们只需用正确的值替换 DEPART
列中的值,并将数据过滤到滞后总和与当前列总和不同的行(以及 LAYER_NO === 0
的行) .最后,我们用排名替换 LAYER_NO
。
inputDF.withColumn("DEPART", last("DEPART").over(w_col))
.withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
.withColumn("lag_sum", lag($"row_sum",1).over(w_key))
.filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
.withColumn("LAYER_NO", rank.over(w_key)-1)
.drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+