pyspark 遍历 window 计算累积最大值

pyspark iterate through window calculate cumulative max

我有一个包含 vehicleID 、时间戳和里程表的数据框。一些里程表读数可能为空。我想创建一个新列,它是每个 vehicleID 时间戳的当前里程表,如果为空,则使用以前的 none 空里程表。

例子

+------------+------------------------+-----------+-------------------------+
|vehicleID   |startDateTimeUtc        |Odometer   |NewColumn-CurrentOdometer|
+------------+------------------------+-----------+-------------------------+
|a           |2019-04-11T16:27:32+0000|10000      |10000                    |
|a           |2019-04-11T16:27:32+0000|15000      |15000                    |
|a           |2019-04-11T16:43:10+0000|null       |15000                    |
|a           |2019-04-11T20:13:52+0000|null       |15000                    |
|a           |2019-04-12T14:50:35+0000|null       |15000                    |
|a           |2019-04-12T18:53:19+0000|20000      |20000                    |
|b           |2019-04-12T19:06:41+0000|350000     |350000                   |
|b           |2019-04-12T19:17:15+0000|370000     |370000                   |
|b           |2019-04-12T19:30:32+0000|null       |370000                   |
|b           |2019-04-12T20:19:41+0000|380000     |380000                   |
|b           |2019-04-12T20:42:26+0000|null       |380000                   |

我知道我需要使用 window 功能。我可能也需要使用“滞后”,但我如何才能不仅查找以前的记录?(参见示例 vehicleID a) 非常感谢!

my_window = Window.partitionBy("vehicleID").orderBy("vehicleID","startDateTimeUtc")

使用 last window functionignoreNulls 标志作为 TruerowsBetween unboundedPreceeding and currentRow.

df.show(20,False)
#+---------+------------------------+--------+
#|vehicleid|startdatetimeutc        |odometer|
#+---------+------------------------+--------+
#|a        |2019-04-11T16:27:32+0000|10000   |
#|a        |2019-04-11T16:27:32+0000|15000   |
#|a        |2019-04-11T16:43:10+0000|null    |
#|a        |2019-04-11T20:13:52+0000|null    |
#|a        |2019-04-12T14:50:35+0000|null    |
#|a        |2019-04-12T18:53:19+0000|20000   |
#|b        |2019-04-12T19:06:41+0000|350000  |
#|b        |2019-04-12T19:17:15+0000|370000  |
#|b        |2019-04-12T19:30:32+0000|null    |
#|b        |2019-04-12T20:19:41+0000|380000  |
#|b        |2019-04-12T20:42:26+0000|null    |
#+---------+------------------------+--------+

import sys
my_window = Window.partitionBy("vehicleID").orderBy("vehicleID","startDateTimeUtc").rowsBetween(-sys.maxsize,0)

df.withColumn("NewColumn-CurrentOdometer",last(col("Odometer"),True).over(my_window)).orderBy("vehicleid").show(20,False)
#+---------+------------------------+--------+-------------------------+
#|vehicleid|startdatetimeutc        |odometer|NewColumn-CurrentOdometer|
#+---------+------------------------+--------+-------------------------+
#|a        |2019-04-11T16:27:32+0000|10000   |10000                    |
#|a        |2019-04-11T16:27:32+0000|15000   |15000                    |
#|a        |2019-04-11T16:43:10+0000|null    |15000                    |
#|a        |2019-04-11T20:13:52+0000|null    |15000                    |
#|a        |2019-04-12T14:50:35+0000|null    |15000                    |
#|a        |2019-04-12T18:53:19+0000|20000   |20000                    |
#|b        |2019-04-12T19:06:41+0000|350000  |350000                   |
#|b        |2019-04-12T19:17:15+0000|370000  |370000                   |
#|b        |2019-04-12T19:30:32+0000|null    |370000                   |
#|b        |2019-04-12T20:19:41+0000|380000  |380000                   |
#|b        |2019-04-12T20:42:26+0000|null    |380000                   |
#+---------+------------------------+--------+-------------------------+

另一种选择- 使用 max 和 window 框架 unboundedpreceding and currentrow

加载提供的测试数据

   val data =
      """
        |vehicleID   |startDateTimeUtc        |Odometer
        |a           |2019-04-11T16:27:32+0000|10000
        |a           |2019-04-11T16:27:32+0000|15000
        |a           |2019-04-11T16:43:10+0000|null
        |a           |2019-04-11T20:13:52+0000|null
        |a           |2019-04-12T14:50:35+0000|null
        |a           |2019-04-12T18:53:19+0000|20000
        |b           |2019-04-12T19:06:41+0000|350000
        |b           |2019-04-12T19:17:15+0000|370000
        |b           |2019-04-12T19:30:32+0000|null
        |b           |2019-04-12T20:19:41+0000|380000
        |b           |2019-04-12T20:42:26+0000|null
      """.stripMargin
    val stringDS1 = data.split(System.lineSeparator())
      .map(_.split("\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()
    /**
      * +---------+------------------------+--------+
      * |vehicleID|startDateTimeUtc        |Odometer|
      * +---------+------------------------+--------+
      * |a        |2019-04-11T16:27:32+0000|10000   |
      * |a        |2019-04-11T16:27:32+0000|15000   |
      * |a        |2019-04-11T16:43:10+0000|null    |
      * |a        |2019-04-11T20:13:52+0000|null    |
      * |a        |2019-04-12T14:50:35+0000|null    |
      * |a        |2019-04-12T18:53:19+0000|20000   |
      * |b        |2019-04-12T19:06:41+0000|350000  |
      * |b        |2019-04-12T19:17:15+0000|370000  |
      * |b        |2019-04-12T19:30:32+0000|null    |
      * |b        |2019-04-12T20:19:41+0000|380000  |
      * |b        |2019-04-12T20:42:26+0000|null    |
      * +---------+------------------------+--------+
      *
      * root
      * |-- vehicleID: string (nullable = true)
      * |-- startDateTimeUtc: string (nullable = true)
      * |-- Odometer: integer (nullable = true)
      */

计算 window 中的最大值

   val w = Window.partitionBy("vehicleID").orderBy("startDateTimeUtc")
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    df1.withColumn("NewColumn-CurrentOdometer",
      max("Odometer").over(w))
      .show(false)

    /**
      * +---------+------------------------+--------+-------------------------+
      * |vehicleID|startDateTimeUtc        |Odometer|NewColumn-CurrentOdometer|
      * +---------+------------------------+--------+-------------------------+
      * |a        |2019-04-11T16:27:32+0000|10000   |10000                    |
      * |a        |2019-04-11T16:27:32+0000|15000   |15000                    |
      * |a        |2019-04-11T16:43:10+0000|null    |15000                    |
      * |a        |2019-04-11T20:13:52+0000|null    |15000                    |
      * |a        |2019-04-12T14:50:35+0000|null    |15000                    |
      * |a        |2019-04-12T18:53:19+0000|20000   |20000                    |
      * |b        |2019-04-12T19:06:41+0000|350000  |350000                   |
      * |b        |2019-04-12T19:17:15+0000|370000  |370000                   |
      * |b        |2019-04-12T19:30:32+0000|null    |370000                   |
      * |b        |2019-04-12T20:19:41+0000|380000  |380000                   |
      * |b        |2019-04-12T20:42:26+0000|null    |380000                   |
      * +---------+------------------------+--------+-------------------------+
      */