Pyspark 中的滞后移位函数

lag shift Funtion in Pyspark

我想在 1 到 7 的范围内移动订单数量。这是我的 python 代码 :

def make_features(data, max_lag):    
    for lag in range(1, max_lag + 1):
        data['lag_{}'.format(lag)] = data['num_orders'].shift(lag)

make_features(df, 7)

我尝试在 Pyspark 中做同样的思考:代码

def make_features(data, max_lag):    
    for lag in range(1, max_lag + 1):
        data['lag_{}'.format(lag)] = data['num_orders'].shift(lag)

make_features(df, 7)   

我得到这个错误

    TypeError: 'int' object is not callable
Traceback (most recent call last):

TypeError: 'int' object is not callable   

我也试试这个 code :

for lag in range(1, 8):
    window = Window.orderBy("date")
    lagCol = lag(col("num_orders"), n).over(window)
    df.withColumn(f"LagCol_{n}", lagCol)

这只是移动了 1 个单位:

预期结果:

在 PySpark 中,没有您预期的 shift 函数,您在使用 lag 时方向正确。但是当你必须根据 lag_1lag_2 等等时,这里有一个小技巧。

from pyspark.sql import functions as F
from pyspark.sql import Window as W

df = df.withColumn('lag_0', F.col('num_orders'))
for lag in range(1, 8):
    df = (df
        .withColumn(f'lag_{lag}', F
            .lag(f'lag_{lag - 1}')
            .over(W
                .partitionBy(F.lit(1))
                .orderBy('date')
            )
        )
    )


+----+----------+-----+-----+-----+-----+-----+-----+-----+-----+
|date|num_orders|lag_0|lag_1|lag_2|lag_3|lag_4|lag_5|lag_6|lag_7|
+----+----------+-----+-----+-----+-----+-----+-----+-----+-----+
|   1|       124|  124| null| null| null| null| null| null| null|
|   2|        85|   85|  124| null| null| null| null| null| null|
|   3|        71|   71|   85|  124| null| null| null| null| null|
|   4|        66|   66|   71|   85|  124| null| null| null| null|
|   5|        43|   43|   66|   71|   85|  124| null| null| null|
|   6|         6|    6|   43|   66|   71|   85|  124| null| null|
|   7|        12|   12|    6|   43|   66|   71|   85|  124| null|
+----+----------+-----+-----+-----+-----+-----+-----+-----+-----+