如何根据 pyspark 数据框中列中的当前值计算未来值?
How to calculate future values based on current value in a column in pyspark dataframe?
我的问题如下。我正在尝试计算未来价值,比方说 pyspark 数据框中的积压值。
我的示例数据框是:
Task start_date end_date Total_salary
Task1 2022-01-01 01-04-2022 500
Task2 2022-03-01 2022-06-01 400
Task3 2019-11-01 2020-01-01 300
Task3 2021-11-01 2022-04-01 600
预期输出:我需要计算从这个月到
直到 end_date 列中的最大日期。我如何获得多少报酬
月份是:Total_salary/Months 在 start_date 和 end_date 之间
我需要以下输出,因为这个 Jan/2022。我需要在单独的 datframe 中使用它
只有下面两列。
date Total_backlog
2022-01-31 #(Task1: 500-100) + (Task2: 300 ( because it didn't
#started yet)) + (Task3: 0)( it's already finished)) +
#(Task4: 600 - 300)
#So total is : 400 + 400+ 0 + 300 = 1100
2022-02-28 800
2022-03-31 .....
.......
2022-06-31 .....
(这是end_date
中的最大日期,但实际数据集这个日期比那个日期多)
我不知道如何遍历 pyspark 数据帧。有人可以帮助我吗?
使用此输入数据框:
df = spark.createDataFrame([
("Task1", "2022-01-01", "2022-04-01", 500),
("Task2", "2022-03-01", "2022-06-01", 400),
("Task3", "2019-11-01", "2020-01-01", 300),
("Task4", "2021-11-01", "2022-04-01", 600)
], ["Task", "start_date", "end_date", "Total_salary"])
首先,使用 sequence
函数生成 dates_df
,如下所示:
# you can repalace '2022-01-01' by current_date truncated to month unit
dates_df = df.selectExpr(
"sequence(date_trunc('mm', '2022-01-01'), date_trunc('mm', max(end_date)), interval 1 month) as dates"
).select(
F.explode("dates").alias("date")
).withColumn(
"date",
F.last_day(F.col("date"))
).crossJoin(df.select("Task").distinct())
现在,在 date > end_date
上与原始数据框左连接并聚合总工资,在使用公式计算剩余工资后:
(Total_salary/nb_months_task) * nb_remaining_months_task
result = (dates_df.join(df, ["Task"], "left")
.filter(F.col("end_date") > F.col("date"))
.withColumn("salary_per_month",
F.round(F.col("Total_salary") / F.months_between("end_date", "start_date")))
.withColumn("Total_salary", F.when(F.col("start_date") < F.col("date"),
F.col("salary_per_month") * F.round(
F.months_between("end_date", "date"))
).otherwise(F.col("Total_salary")))
.groupBy("date")
.agg(F.sum("Total_salary").alias("Total_backlog"))
).orderBy("date")
result.show()
#+----------+-------------+
#| date|Total_backlog|
#+----------+-------------+
#|2022-01-31| 974.0|
#|2022-02-28| 687.0|
#|2022-03-31| 266.0|
#|2022-04-30| 133.0|
#|2022-05-31| 0.0|
#|2022-06-30| null|
#+----------+-------------+
如果实际逻辑不一样,您可以修改最后一部分。但是你明白了。
我的问题如下。我正在尝试计算未来价值,比方说 pyspark 数据框中的积压值。
我的示例数据框是:
Task start_date end_date Total_salary
Task1 2022-01-01 01-04-2022 500
Task2 2022-03-01 2022-06-01 400
Task3 2019-11-01 2020-01-01 300
Task3 2021-11-01 2022-04-01 600
预期输出:我需要计算从这个月到 直到 end_date 列中的最大日期。我如何获得多少报酬 月份是:Total_salary/Months 在 start_date 和 end_date 之间 我需要以下输出,因为这个 Jan/2022。我需要在单独的 datframe 中使用它 只有下面两列。
date Total_backlog
2022-01-31 #(Task1: 500-100) + (Task2: 300 ( because it didn't
#started yet)) + (Task3: 0)( it's already finished)) +
#(Task4: 600 - 300)
#So total is : 400 + 400+ 0 + 300 = 1100
2022-02-28 800
2022-03-31 .....
.......
2022-06-31 .....
(这是end_date
中的最大日期,但实际数据集这个日期比那个日期多)
我不知道如何遍历 pyspark 数据帧。有人可以帮助我吗?
使用此输入数据框:
df = spark.createDataFrame([
("Task1", "2022-01-01", "2022-04-01", 500),
("Task2", "2022-03-01", "2022-06-01", 400),
("Task3", "2019-11-01", "2020-01-01", 300),
("Task4", "2021-11-01", "2022-04-01", 600)
], ["Task", "start_date", "end_date", "Total_salary"])
首先,使用 sequence
函数生成 dates_df
,如下所示:
# you can repalace '2022-01-01' by current_date truncated to month unit
dates_df = df.selectExpr(
"sequence(date_trunc('mm', '2022-01-01'), date_trunc('mm', max(end_date)), interval 1 month) as dates"
).select(
F.explode("dates").alias("date")
).withColumn(
"date",
F.last_day(F.col("date"))
).crossJoin(df.select("Task").distinct())
现在,在 date > end_date
上与原始数据框左连接并聚合总工资,在使用公式计算剩余工资后:
(Total_salary/nb_months_task) * nb_remaining_months_task
result = (dates_df.join(df, ["Task"], "left")
.filter(F.col("end_date") > F.col("date"))
.withColumn("salary_per_month",
F.round(F.col("Total_salary") / F.months_between("end_date", "start_date")))
.withColumn("Total_salary", F.when(F.col("start_date") < F.col("date"),
F.col("salary_per_month") * F.round(
F.months_between("end_date", "date"))
).otherwise(F.col("Total_salary")))
.groupBy("date")
.agg(F.sum("Total_salary").alias("Total_backlog"))
).orderBy("date")
result.show()
#+----------+-------------+
#| date|Total_backlog|
#+----------+-------------+
#|2022-01-31| 974.0|
#|2022-02-28| 687.0|
#|2022-03-31| 266.0|
#|2022-04-30| 133.0|
#|2022-05-31| 0.0|
#|2022-06-30| null|
#+----------+-------------+
如果实际逻辑不一样,您可以修改最后一部分。但是你明白了。