spark sql window 函数滞后
spark sql window function lag
我正在查看 Scala 中 Spark DataFrame
的 window
滑动函数。
我有一个包含 Col1
、Col2
、Col3
、date
、volume
和 new_col
列的 DataFrame
.
Col1 Col2 Col3 date volume new_col
201601 100.5
201602 120.6 100.5
201603 450.2 120.6
201604 200.7 450.2
201605 121.4 200.7`
现在我想添加一个名称为(new_col
)的新列,其中一行向下滑动,如上所示。
我尝试了以下选项来使用 window 函数。
val windSldBrdrxNrx_df = df.withColumn("Prev_brand_rx", lag("Prev_brand_rx",1))
你有什么建议吗?
您做对了,您在 lag
上错过了 over(window expression)
val df = sc.parallelize(Seq((201601, 100.5),
(201602, 120.6),
(201603, 450.2),
(201604, 200.7),
(201605, 121.4))).toDF("date", "volume")
val w = org.apache.spark.sql.expressions.Window.orderBy("date")
import org.apache.spark.sql.functions.lag
val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w))
leadDf.show()
+------+------+-------+
| date|volume|new_col|
+------+------+-------+
|201601| 100.5| 0.0|
|201602| 120.6| 100.5|
|201603| 450.2| 120.6|
|201604| 200.7| 450.2|
|201605| 121.4| 200.7|
+------+------+-------+
此代码是 运行 在 Spark shell 2.0.2
上
您可以导入以下两个包,这将解决延迟依赖的问题。
import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window
我正在查看 Scala 中 Spark DataFrame
的 window
滑动函数。
我有一个包含 Col1
、Col2
、Col3
、date
、volume
和 new_col
列的 DataFrame
.
Col1 Col2 Col3 date volume new_col
201601 100.5
201602 120.6 100.5
201603 450.2 120.6
201604 200.7 450.2
201605 121.4 200.7`
现在我想添加一个名称为(new_col
)的新列,其中一行向下滑动,如上所示。
我尝试了以下选项来使用 window 函数。
val windSldBrdrxNrx_df = df.withColumn("Prev_brand_rx", lag("Prev_brand_rx",1))
你有什么建议吗?
您做对了,您在 lag
over(window expression)
val df = sc.parallelize(Seq((201601, 100.5),
(201602, 120.6),
(201603, 450.2),
(201604, 200.7),
(201605, 121.4))).toDF("date", "volume")
val w = org.apache.spark.sql.expressions.Window.orderBy("date")
import org.apache.spark.sql.functions.lag
val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w))
leadDf.show()
+------+------+-------+
| date|volume|new_col|
+------+------+-------+
|201601| 100.5| 0.0|
|201602| 120.6| 100.5|
|201603| 450.2| 120.6|
|201604| 200.7| 450.2|
|201605| 121.4| 200.7|
+------+------+-------+
此代码是 运行 在 Spark shell 2.0.2
上您可以导入以下两个包,这将解决延迟依赖的问题。
import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window