Pyspark:创建滞后列
Pyspark: create a lag column
我正在使用 pyspark 并获得如下 table,table_1
+--------+------------------+---------------+----------+-----+
| Number|Institution | datetime| date| time|
+--------+------------------+---------------+----------+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45|
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|
+--------+------------------+---------------+----------+-----+
我想在 table_1
中添加时间滞后列(15 分钟)
+--------+------------------+---------------+----------+-----+-----+
| Number|Institution | datetime| date| time|lag1 |
+--------+------------------+---------------+----------+-----+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45|7:30 |
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+------------------+---------------+----------+-----+-----+
from datetime import datetime, timedelta
table_2 = table_.withColumn('lag1', (datetime.strptime(table1['time'], '%H:%M') -timedelta(minutes=15)).strftime('%H:%M'))
上面的代码可以应用于字符串,但我不知道为什么在这种情况下它不能应用于 table。它显示错误“'TypeError: strptime() argument 1 must be str, not Column'”,是否有任何方法可以从 Pyspark 的列中获取字符串?谢谢!
您不能直接在 Spark 数据框列上使用 Python 函数。您可以改用 Spark SQL 函数,如下所示:
import pyspark.sql.functions as F
df2 = df.withColumn(
'lag1',
F.expr("date_format(to_timestamp(time, 'H:m') - interval 15 minute, 'H:m')")
)
df2.show()
+--------+-----------+---------------+----------+-----+-----+
| Number|Institution| datetime| date| time| lag1|
+--------+-----------+---------------+----------+-----+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45| 7:30|
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+-----------+---------------+----------+-----+-----+
或者,您可以将 Python 函数作为 UDF 调用(但性能应该比直接调用 Spark SQL 函数差):
import pyspark.sql.functions as F
from datetime import datetime, timedelta
lag = F.udf(lambda t: (datetime.strptime(t, '%H:%M') -timedelta(minutes=15)).strftime('%H:%M'))
df2 = df.withColumn('lag1', lag('time'))
df2.show()
+--------+-----------+---------------+----------+-----+-----+
| Number|Institution| datetime| date| time| lag1|
+--------+-----------+---------------+----------+-----+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45|07:30|
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+-----------+---------------+----------+-----+-----+
我正在使用 pyspark 并获得如下 table,table_1
+--------+------------------+---------------+----------+-----+
| Number|Institution | datetime| date| time|
+--------+------------------+---------------+----------+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45|
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|
+--------+------------------+---------------+----------+-----+
我想在 table_1
中添加时间滞后列(15 分钟)+--------+------------------+---------------+----------+-----+-----+
| Number|Institution | datetime| date| time|lag1 |
+--------+------------------+---------------+----------+-----+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45|7:30 |
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+------------------+---------------+----------+-----+-----+
from datetime import datetime, timedelta
table_2 = table_.withColumn('lag1', (datetime.strptime(table1['time'], '%H:%M') -timedelta(minutes=15)).strftime('%H:%M'))
上面的代码可以应用于字符串,但我不知道为什么在这种情况下它不能应用于 table。它显示错误“'TypeError: strptime() argument 1 must be str, not Column'”,是否有任何方法可以从 Pyspark 的列中获取字符串?谢谢!
您不能直接在 Spark 数据框列上使用 Python 函数。您可以改用 Spark SQL 函数,如下所示:
import pyspark.sql.functions as F
df2 = df.withColumn(
'lag1',
F.expr("date_format(to_timestamp(time, 'H:m') - interval 15 minute, 'H:m')")
)
df2.show()
+--------+-----------+---------------+----------+-----+-----+
| Number|Institution| datetime| date| time| lag1|
+--------+-----------+---------------+----------+-----+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45| 7:30|
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+-----------+---------------+----------+-----+-----+
或者,您可以将 Python 函数作为 UDF 调用(但性能应该比直接调用 Spark SQL 函数差):
import pyspark.sql.functions as F
from datetime import datetime, timedelta
lag = F.udf(lambda t: (datetime.strptime(t, '%H:%M') -timedelta(minutes=15)).strftime('%H:%M'))
df2 = df.withColumn('lag1', lag('time'))
df2.show()
+--------+-----------+---------------+----------+-----+-----+
| Number|Institution| datetime| date| time| lag1|
+--------+-----------+---------------+----------+-----+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45|07:30|
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+-----------+---------------+----------+-----+-----+