Pyspark:创建滞后列

Pyspark: create a lag column

我正在使用 pyspark 并获得如下 table,table_1

+--------+------------------+---------------+----------+-----+
|  Number|Institution       |       datetime|      date| time|
+--------+------------------+---------------+----------+-----+
|AE19075B|               ABC| 7/20/2019 7:45|07/20/2019| 7:45|
|AE11688U|               CBT|2/11/2019 20:31|02/11/2019|20:31|
+--------+------------------+---------------+----------+-----+

我想在 table_1

中添加时间滞后列(15 分钟)
+--------+------------------+---------------+----------+-----+-----+
|  Number|Institution       |       datetime|      date| time|lag1 |
+--------+------------------+---------------+----------+-----+-----+
|AE19075B|               ABC| 7/20/2019 7:45|07/20/2019| 7:45|7:30 |
|AE11688U|               CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+------------------+---------------+----------+-----+-----+
from datetime import datetime, timedelta

table_2 = table_.withColumn('lag1', (datetime.strptime(table1['time'], '%H:%M') -timedelta(minutes=15)).strftime('%H:%M'))

上面的代码可以应用于字符串,但我不知道为什么在这种情况下它不能应用于 table。它显示错误“'TypeError: strptime() argument 1 must be str, not Column'”,是否有任何方法可以从 Pyspark 的列中获取字符串?谢谢!

您不能直接在 Spark 数据框列上使用 Python 函数。您可以改用 Spark SQL 函数,如下所示:

import pyspark.sql.functions as F

df2 = df.withColumn(
    'lag1',
    F.expr("date_format(to_timestamp(time, 'H:m') - interval 15 minute, 'H:m')")
)

df2.show()
+--------+-----------+---------------+----------+-----+-----+
|  Number|Institution|       datetime|      date| time| lag1|
+--------+-----------+---------------+----------+-----+-----+
|AE19075B|        ABC| 7/20/2019 7:45|07/20/2019| 7:45| 7:30|
|AE11688U|        CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+-----------+---------------+----------+-----+-----+

或者,您可以将 Python 函数作为 UDF 调用(但性能应该比直接调用 Spark SQL 函数差):

import pyspark.sql.functions as F
from datetime import datetime, timedelta

lag = F.udf(lambda t: (datetime.strptime(t, '%H:%M') -timedelta(minutes=15)).strftime('%H:%M'))

df2 = df.withColumn('lag1', lag('time'))
df2.show()
+--------+-----------+---------------+----------+-----+-----+
|  Number|Institution|       datetime|      date| time| lag1|
+--------+-----------+---------------+----------+-----+-----+
|AE19075B|        ABC| 7/20/2019 7:45|07/20/2019| 7:45|07:30|
|AE11688U|        CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+-----------+---------------+----------+-----+-----+