Pyspark 中的滞后移位函数
lag shift Funtion in Pyspark
我想在 1 到 7 的范围内移动订单数量。这是我的 python 代码 :
def make_features(data, max_lag):
for lag in range(1, max_lag + 1):
data['lag_{}'.format(lag)] = data['num_orders'].shift(lag)
make_features(df, 7)
我尝试在 Pyspark 中做同样的思考:代码:
def make_features(data, max_lag):
for lag in range(1, max_lag + 1):
data['lag_{}'.format(lag)] = data['num_orders'].shift(lag)
make_features(df, 7)
我得到这个错误:
TypeError: 'int' object is not callable
Traceback (most recent call last):
TypeError: 'int' object is not callable
我也试试这个 code :
for lag in range(1, 8):
window = Window.orderBy("date")
lagCol = lag(col("num_orders"), n).over(window)
df.withColumn(f"LagCol_{n}", lagCol)
这只是移动了 1 个单位:
预期结果:
在 PySpark 中,没有您预期的 shift
函数,您在使用 lag
时方向正确。但是当你必须根据 lag_1
做 lag_2
等等时,这里有一个小技巧。
from pyspark.sql import functions as F
from pyspark.sql import Window as W
df = df.withColumn('lag_0', F.col('num_orders'))
for lag in range(1, 8):
df = (df
.withColumn(f'lag_{lag}', F
.lag(f'lag_{lag - 1}')
.over(W
.partitionBy(F.lit(1))
.orderBy('date')
)
)
)
+----+----------+-----+-----+-----+-----+-----+-----+-----+-----+
|date|num_orders|lag_0|lag_1|lag_2|lag_3|lag_4|lag_5|lag_6|lag_7|
+----+----------+-----+-----+-----+-----+-----+-----+-----+-----+
| 1| 124| 124| null| null| null| null| null| null| null|
| 2| 85| 85| 124| null| null| null| null| null| null|
| 3| 71| 71| 85| 124| null| null| null| null| null|
| 4| 66| 66| 71| 85| 124| null| null| null| null|
| 5| 43| 43| 66| 71| 85| 124| null| null| null|
| 6| 6| 6| 43| 66| 71| 85| 124| null| null|
| 7| 12| 12| 6| 43| 66| 71| 85| 124| null|
+----+----------+-----+-----+-----+-----+-----+-----+-----+-----+
我想在 1 到 7 的范围内移动订单数量。这是我的 python 代码 :
def make_features(data, max_lag):
for lag in range(1, max_lag + 1):
data['lag_{}'.format(lag)] = data['num_orders'].shift(lag)
make_features(df, 7)
我尝试在 Pyspark 中做同样的思考:代码:
def make_features(data, max_lag):
for lag in range(1, max_lag + 1):
data['lag_{}'.format(lag)] = data['num_orders'].shift(lag)
make_features(df, 7)
我得到这个错误:
TypeError: 'int' object is not callable
Traceback (most recent call last):
TypeError: 'int' object is not callable
我也试试这个 code :
for lag in range(1, 8):
window = Window.orderBy("date")
lagCol = lag(col("num_orders"), n).over(window)
df.withColumn(f"LagCol_{n}", lagCol)
这只是移动了 1 个单位:
预期结果:
在 PySpark 中,没有您预期的 shift
函数,您在使用 lag
时方向正确。但是当你必须根据 lag_1
做 lag_2
等等时,这里有一个小技巧。
from pyspark.sql import functions as F
from pyspark.sql import Window as W
df = df.withColumn('lag_0', F.col('num_orders'))
for lag in range(1, 8):
df = (df
.withColumn(f'lag_{lag}', F
.lag(f'lag_{lag - 1}')
.over(W
.partitionBy(F.lit(1))
.orderBy('date')
)
)
)
+----+----------+-----+-----+-----+-----+-----+-----+-----+-----+
|date|num_orders|lag_0|lag_1|lag_2|lag_3|lag_4|lag_5|lag_6|lag_7|
+----+----------+-----+-----+-----+-----+-----+-----+-----+-----+
| 1| 124| 124| null| null| null| null| null| null| null|
| 2| 85| 85| 124| null| null| null| null| null| null|
| 3| 71| 71| 85| 124| null| null| null| null| null|
| 4| 66| 66| 71| 85| 124| null| null| null| null|
| 5| 43| 43| 66| 71| 85| 124| null| null| null|
| 6| 6| 6| 43| 66| 71| 85| 124| null| null|
| 7| 12| 12| 6| 43| 66| 71| 85| 124| null|
+----+----------+-----+-----+-----+-----+-----+-----+-----+-----+