需要在 pyspark 中的两个日期列之间添加日期范围吗?
Need to add date ranges between two date columns in pyspark?
我输入的 pyspark 数据框包含 ID、StartDatetime、EndDatetime 等列。我想根据开始日期时间和结束日期时间添加名为 newdate 的新列。
输入方向:-
ID StartDatetime EndDatetime
1 21-06-2021 07:00 24-06-2021 16:00
2 21-06-2021 07:00 22-06-2021 16:00
要求输出:-
ID StartDatetime EndDatetime newdate
1 21-06-2021 07:00 24-06-2021 16:00 21-06-2021
1 21-06-2021 07:00 24-06-2021 16:00 22-06-2021
1 21-06-2021 07:00 24-06-2021 16:00 23-06-2021
1 21-06-2021 07:00 24-06-2021 16:00 24-06-2021
2 21-06-2021 07:00 22-06-2021 16:00 21-06-2021
2 21-06-2021 07:00 22-06-2021 16:00 22-06-2021
您可以使用 explode
和 array_repeat
来复制行。
我使用 row_number
和 date
函数的组合来获取 start
和 end
日期之间的日期范围:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy("ID").orderBy('StartDatetime')
output_df = df.withColumn("diff", 1+F.datediff(F.to_date(F.unix_timestamp('EndDatetime', 'dd-MM-yyyy HH:mm').cast('timestamp')), \
F.to_date(F.unix_timestamp('StartDatetime', 'dd-MM-yyyy HH:mm').cast('timestamp'))))\
.withColumn('diff', F.expr('explode(array_repeat(diff,int(diff)))'))\
.withColumn("diff", F.row_number().over(w))\
.withColumn("start_dt", F.to_date(F.unix_timestamp('StartDatetime', 'dd-MM-yyyy HH:mm').cast('timestamp')))\
.withColumn("newdate", F.date_format(F.expr("date_add(start_dt, diff-1)"), 'dd-MM-yyyy')).drop('diff', 'start_dt')
输出:
output_df.orderBy("ID", "newdate").show()
+---+----------------+----------------+----------+
| ID| StartDatetime| EndDatetime| newdate|
+---+----------------+----------------+----------+
| 1|21-06-2021 07:00|24-06-2021 16:00|21-06-2021|
| 1|21-06-2021 07:00|24-06-2021 16:00|22-06-2021|
| 1|21-06-2021 07:00|24-06-2021 16:00|23-06-2021|
| 1|21-06-2021 07:00|24-06-2021 16:00|24-06-2021|
| 2|21-06-2021 07:00|22-06-2021 16:00|21-06-2021|
| 2|21-06-2021 07:00|22-06-2021 16:00|22-06-2021|
+---+----------------+----------------+----------+
我删除了 diff
列,但显示它可以帮助您在不清楚的情况下理解逻辑。
我输入的 pyspark 数据框包含 ID、StartDatetime、EndDatetime 等列。我想根据开始日期时间和结束日期时间添加名为 newdate 的新列。
输入方向:-
ID StartDatetime EndDatetime
1 21-06-2021 07:00 24-06-2021 16:00
2 21-06-2021 07:00 22-06-2021 16:00
要求输出:-
ID StartDatetime EndDatetime newdate
1 21-06-2021 07:00 24-06-2021 16:00 21-06-2021
1 21-06-2021 07:00 24-06-2021 16:00 22-06-2021
1 21-06-2021 07:00 24-06-2021 16:00 23-06-2021
1 21-06-2021 07:00 24-06-2021 16:00 24-06-2021
2 21-06-2021 07:00 22-06-2021 16:00 21-06-2021
2 21-06-2021 07:00 22-06-2021 16:00 22-06-2021
您可以使用 explode
和 array_repeat
来复制行。
我使用 row_number
和 date
函数的组合来获取 start
和 end
日期之间的日期范围:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy("ID").orderBy('StartDatetime')
output_df = df.withColumn("diff", 1+F.datediff(F.to_date(F.unix_timestamp('EndDatetime', 'dd-MM-yyyy HH:mm').cast('timestamp')), \
F.to_date(F.unix_timestamp('StartDatetime', 'dd-MM-yyyy HH:mm').cast('timestamp'))))\
.withColumn('diff', F.expr('explode(array_repeat(diff,int(diff)))'))\
.withColumn("diff", F.row_number().over(w))\
.withColumn("start_dt", F.to_date(F.unix_timestamp('StartDatetime', 'dd-MM-yyyy HH:mm').cast('timestamp')))\
.withColumn("newdate", F.date_format(F.expr("date_add(start_dt, diff-1)"), 'dd-MM-yyyy')).drop('diff', 'start_dt')
输出:
output_df.orderBy("ID", "newdate").show()
+---+----------------+----------------+----------+
| ID| StartDatetime| EndDatetime| newdate|
+---+----------------+----------------+----------+
| 1|21-06-2021 07:00|24-06-2021 16:00|21-06-2021|
| 1|21-06-2021 07:00|24-06-2021 16:00|22-06-2021|
| 1|21-06-2021 07:00|24-06-2021 16:00|23-06-2021|
| 1|21-06-2021 07:00|24-06-2021 16:00|24-06-2021|
| 2|21-06-2021 07:00|22-06-2021 16:00|21-06-2021|
| 2|21-06-2021 07:00|22-06-2021 16:00|22-06-2021|
+---+----------------+----------------+----------+
我删除了 diff
列,但显示它可以帮助您在不清楚的情况下理解逻辑。