Spark 使用上一行的值将新列添加到数据框
Spark add new column to dataframe with value from previous row
我想知道如何在 Spark (Pyspark) 中实现以下目标
初始数据帧:
+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
结果数据框:
+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0| 7.0 |
+--+---+-------+
|3 |7.0| 3.0 |
+--+---+-------+
|2 |3.0| 5.0 |
+--+---+-------+
我通常设法 "append" 数据框的新列,方法如下:
df.withColumn("new_Col", df.num * 10)
但是我不知道如何为新列实现此 "shift of rows",以便新列具有前一行字段的值(如示例所示)。我在 API 文档中也找不到任何关于如何通过索引访问 DF 中的特定行的内容。
如有任何帮助,我们将不胜感激。
可以使用lag
window函数如下
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()
## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## | 2|3.0| 5.0|
## | 3|7.0| 3.0|
## | 4|9.0| 7.0|
## +---+---+-------+
但是有一些重要的问题:
- 如果你需要一个全局操作(不被其他列/列分区),它是非常低效的。
- 您需要一种自然的方式来订购您的数据。
虽然第二个问题几乎从来都不是问题,但第一个问题可能会破坏交易。如果是这种情况,您应该简单地将 DataFrame
转换为 RDD 并手动计算 lag
。参见示例:
- Apache Spark Moving Average(用 Scala 编写,但可以针对 PySpark 进行调整。请务必先阅读评论)。
其他有用的链接:
val df = sc.parallelize(Seq((4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0))).toDF("id", "num")
df.show
+---+---+
| id|num|
+---+---+
| 4|9.0|
| 3|7.0|
| 2|3.0|
| 1|5.0|
+---+---+
df.withColumn("new_column", lag("num", 1, 0).over(w)).show
+---+---+----------+
| id|num|new_column|
+---+---+----------+
| 1|5.0| 0.0|
| 2|3.0| 5.0|
| 3|7.0| 3.0|
| 4|9.0| 7.0|
+---+---+----------+
我想知道如何在 Spark (Pyspark) 中实现以下目标
初始数据帧:
+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
结果数据框:
+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0| 7.0 |
+--+---+-------+
|3 |7.0| 3.0 |
+--+---+-------+
|2 |3.0| 5.0 |
+--+---+-------+
我通常设法 "append" 数据框的新列,方法如下:
df.withColumn("new_Col", df.num * 10)
但是我不知道如何为新列实现此 "shift of rows",以便新列具有前一行字段的值(如示例所示)。我在 API 文档中也找不到任何关于如何通过索引访问 DF 中的特定行的内容。
如有任何帮助,我们将不胜感激。
可以使用lag
window函数如下
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()
## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## | 2|3.0| 5.0|
## | 3|7.0| 3.0|
## | 4|9.0| 7.0|
## +---+---+-------+
但是有一些重要的问题:
- 如果你需要一个全局操作(不被其他列/列分区),它是非常低效的。
- 您需要一种自然的方式来订购您的数据。
虽然第二个问题几乎从来都不是问题,但第一个问题可能会破坏交易。如果是这种情况,您应该简单地将 DataFrame
转换为 RDD 并手动计算 lag
。参见示例:
- Apache Spark Moving Average(用 Scala 编写,但可以针对 PySpark 进行调整。请务必先阅读评论)。
其他有用的链接:
val df = sc.parallelize(Seq((4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0))).toDF("id", "num")
df.show
+---+---+
| id|num|
+---+---+
| 4|9.0|
| 3|7.0|
| 2|3.0|
| 1|5.0|
+---+---+
df.withColumn("new_column", lag("num", 1, 0).over(w)).show
+---+---+----------+
| id|num|new_column|
+---+---+----------+
| 1|5.0| 0.0|
| 2|3.0| 5.0|
| 3|7.0| 3.0|
| 4|9.0| 7.0|
+---+---+----------+