Spark Dataframe Window 基于多列的滞后函数
Spark Dataframe Window lag function based on multiple columns
val df = sc.parallelize(Seq((201601, 100.5),
(201602, 120.6),
(201603, 450.2),
(201604, 200.7),
(201605, 121.4))).toDF("date", "volume")
val w = org.apache.spark.sql.expressions.Window.orderBy("date")
val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w))
leadDf.show()
+------+------+-------+
| date|volume|new_col|
+------+------+-------+
|201601| 100.5| 0.0|
|201602| 120.6| 100.5|
|201603| 450.2| 120.6|
|201604| 200.7| 450.2|
|201605| 121.4| 200.7|
+------+------+-------+
这工作正常。
但是如果我像下面这样多一栏作为领土。
val df = sc.parallelize(Seq((201601, ter1, 10.1),
(201601, ter2, 10.6),
(201602, ter1, 10.7),
(201603, ter3, 10.8),
(201603, ter4, 10.8),
(201603, ter3, 10.8),
(201604, ter4, 10.9))).toDF("date", "territory", "volume")
我的要求是同一地区,我想找到上个月的交易量(如果存在)如果不存在就赋值0.0
如果我没理解错的话,您需要同一地区的前一个日期的值。
如果是这样,那么只需添加 partitionBy,即重新定义您的 window 规范如下:
val w = org.apache.spark.sql.expressions.Window.partitionBy("territory").orderBy("date")
val df = sc.parallelize(Seq((201601, 100.5),
(201602, 120.6),
(201603, 450.2),
(201604, 200.7),
(201605, 121.4))).toDF("date", "volume")
val w = org.apache.spark.sql.expressions.Window.orderBy("date")
val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w))
leadDf.show()
+------+------+-------+
| date|volume|new_col|
+------+------+-------+
|201601| 100.5| 0.0|
|201602| 120.6| 100.5|
|201603| 450.2| 120.6|
|201604| 200.7| 450.2|
|201605| 121.4| 200.7|
+------+------+-------+
这工作正常。
但是如果我像下面这样多一栏作为领土。
val df = sc.parallelize(Seq((201601, ter1, 10.1),
(201601, ter2, 10.6),
(201602, ter1, 10.7),
(201603, ter3, 10.8),
(201603, ter4, 10.8),
(201603, ter3, 10.8),
(201604, ter4, 10.9))).toDF("date", "territory", "volume")
我的要求是同一地区,我想找到上个月的交易量(如果存在)如果不存在就赋值0.0
如果我没理解错的话,您需要同一地区的前一个日期的值。
如果是这样,那么只需添加 partitionBy,即重新定义您的 window 规范如下:
val w = org.apache.spark.sql.expressions.Window.partitionBy("territory").orderBy("date")