Pyspark:如何编写复杂的数据框计算铅总和

Pyspark : how to code complicated dataframe calculation lead sum

我给出的数据框看起来像这样。 此数据框按日期排序,col1 只是一些随机值。

    TEST_schema = StructType([StructField("date", StringType(), True),\
                              StructField("col1", IntegerType(), True),\
                             ])
    TEST_data = [('2020-08-01',3),('2020-08-02',1),('2020-08-03',-1),('2020-08-04',-1),('2020-08-05',3),\
                 ('2020-08-06',-1),('2020-08-07',6),('2020-08-08',4),('2020-08-09',5)]
    rdd3 = sc.parallelize(TEST_data)
    TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
    TEST_df.show() 
    
+----------+----+
|      date|col1|
+----------+----+
|2020-08-01|   3|
|2020-08-02|   1|
|2020-08-03|  -1|
|2020-08-04|  -1|
|2020-08-05|   3|
|2020-08-06|  -1|
|2020-08-07|   6|
|2020-08-08|   4|
|2020-08-09|   5|
+----------+----+

LOGIC : lead(col1) +1,如果 col1 ==-1,则从之前的值 lead(col1) +2...
结果数据框将如下所示(想要的列是我想要的输出)

+----------+----+----+
|      date|col1|WANT|
+----------+----+----+
|2020-08-01|   3|   2|
|2020-08-02|   1|   6|
|2020-08-03|  -1|   5|
|2020-08-04|  -1|   4|
|2020-08-05|   3|   8|
|2020-08-06|  -1|   7|
|2020-08-07|   6|   5|
|2020-08-08|   4|   6|
|2020-08-09|   5|  -1|
+----------+----+----+

让我们看最后一行,其中col1==5,即5是leaded +1 want==6 (2020-08-08) 如果我们有 col==-1,那么我们再添加 +1,如果我们有 col==-1 重复两次,那么我们再添加 +2.. 这很难用语言解释,最后因为它创建了最后一列而不是 null,所以用 -1 替换。我有图

您可以检查以下代码和逻辑是否适合您:

  1. 创建一个 sub-group 标签 g,它取 运行 和 int(col1!=-1),我们只关心 col1 == -1 的行,并使所有其他行无效行。
  2. 残差为 1,如果 col1 == -1,加上 运行 计数 Window w2
  3. 将 prev_col1 置于 w1 之上,后者不是 -1(使用 nullif),(prev_col1 的命名可能会造成混淆,因为它只需要 col1 = -1 使用典型的 pyspark 的方式做 ffill,否则保持原来的方式)。
  4. set val = prev_col1 + residual,取滞后并将null设置为-1

代码如下:

from pyspark.sql.functions import when, col, expr, count, desc, lag, coalesce    
from pyspark.sql import Window

w1 = Window.orderBy(desc('date'))
w2 = Window.partitionBy('g').orderBy(desc('date'))  

TEST_df.withColumn('g', when(col('col1') == -1, expr("sum(int(col1!=-1))").over(w1))) \
    .withColumn('residual', when(col('col1') == -1, count('*').over(w2) + 1).otherwise(1)) \
    .withColumn('prev_col1',expr("last(nullif(col1,-1),True)").over(w1)) \
    .withColumn('want', coalesce(lag(expr("prev_col1 + residual")).over(w1),lit(-1))) \
    .orderBy('date').show()
+----------+----+----+--------+---------+----+
|      date|col1|   g|residual|prev_col1|want|
+----------+----+----+--------+---------+----+
|2020-08-01|   3|null|       1|        3|   2|
|2020-08-02|   1|null|       1|        1|   6|
|2020-08-03|  -1|   4|       3|        3|   5|
|2020-08-04|  -1|   4|       2|        3|   4|
|2020-08-05|   3|null|       1|        3|   8|
|2020-08-06|  -1|   3|       2|        6|   7|
|2020-08-07|   6|null|       1|        6|   5|
|2020-08-08|   4|null|       1|        4|   6|
|2020-08-09|   5|null|       1|        5|  -1|
+----------+----+----+--------+---------+----+