如何根据 pyspark 数据框中列中的当前值计算未来值?

How to calculate future values based on current value in a column in pyspark dataframe?

我的问题如下。我正在尝试计算未来价值,比方说 pyspark 数据框中的积压值。

我的示例数据框是:

 Task     start_date      end_date  Total_salary   
Task1     2022-01-01    01-04-2022           500                
Task2     2022-03-01    2022-06-01           400                                
Task3     2019-11-01    2020-01-01           300   
Task3     2021-11-01    2022-04-01           600                       

预期输出:我需要计算从这个月到 直到 end_date 列中的最大日期。我如何获得多少报酬 月份是:Total_salary/Months 在 start_date 和 end_date 之间 我需要以下输出,因为这个 Jan/2022。我需要在单独的 datframe 中使用它 只有下面两列。

date              Total_backlog
2022-01-31        #(Task1: 500-100) +  (Task2: 300 ( because it didn't 
                  #started yet)) + (Task3: 0)( it's already finished)) + 
                  #(Task4: 600 - 300)  
                  #So total is : 400 + 400+ 0 + 300 = 1100

2022-02-28        800

2022-03-31        .....

.......
2022-06-31        .....

(这是end_date中的最大日期,但实际数据集这个日期比那个日期多)

我不知道如何遍历 pyspark 数据帧。有人可以帮助我吗?

使用此输入数据框:

df = spark.createDataFrame([
    ("Task1", "2022-01-01", "2022-04-01", 500),
    ("Task2", "2022-03-01", "2022-06-01", 400),
    ("Task3", "2019-11-01", "2020-01-01", 300),
    ("Task4", "2021-11-01", "2022-04-01", 600)
], ["Task", "start_date", "end_date", "Total_salary"])

首先,使用 sequence 函数生成 dates_df,如下所示:

# you can repalace '2022-01-01' by current_date truncated to month unit
dates_df = df.selectExpr(
    "sequence(date_trunc('mm', '2022-01-01'), date_trunc('mm', max(end_date)), interval 1 month) as dates"
).select(
    F.explode("dates").alias("date")
).withColumn(
    "date",
    F.last_day(F.col("date"))
).crossJoin(df.select("Task").distinct())

现在,在 date > end_date 上与原始数据框左连接并聚合总工资,在使用公式计算剩余工资后:

(Total_salary/nb_months_task) * nb_remaining_months_task

result = (dates_df.join(df, ["Task"], "left")
          .filter(F.col("end_date") > F.col("date"))
          .withColumn("salary_per_month",
                      F.round(F.col("Total_salary") / F.months_between("end_date", "start_date")))
          .withColumn("Total_salary", F.when(F.col("start_date") < F.col("date"),
                                             F.col("salary_per_month") * F.round(
                                                 F.months_between("end_date", "date"))
                                             ).otherwise(F.col("Total_salary")))
          .groupBy("date")
          .agg(F.sum("Total_salary").alias("Total_backlog"))
          ).orderBy("date")

result.show()
#+----------+-------------+
#|      date|Total_backlog|
#+----------+-------------+
#|2022-01-31|        974.0|
#|2022-02-28|        687.0|
#|2022-03-31|        266.0|
#|2022-04-30|        133.0|
#|2022-05-31|          0.0|
#|2022-06-30|         null|
#+----------+-------------+

如果实际逻辑不一样,您可以修改最后一部分。但是你明白了。