SparkSQL滑动window差分计算

Spark SQL sliding window difference computation

如何在不借助 Spark 流的情况下在 Spark 中计算滑动 window?

注意:我不想使用 WINDOW PARTITION BY ORDER BY k ROWS before/after 当前的,而是使用时间戳。 window运算符有这样一个模式:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([{"a": "x", "b": "2021-02-21 01:00:00", "c": "3"},
                            {"a": "x", "b": "2021-02-21 02:00:00", "c": "4"},
                            {"a": "x", "b": "2021-02-21 03:00:00", "c": "2"}])

hour_interval = str(4) + ' hour'
sliding_window = str(60) + ' minute'
from pyspark.sql.functions import col, min, max, sum, when, lit, window, date_format
import time

df_aggregated_time_window = df.groupBy("a", window("b", windowDuration=hour_interval,
                                slideDuration=sliding_window, startTime="30 minute")).agg(min("c").alias("min_c"))
df_aggregated_time_window.show(truncate=False)

+---+------------------------------------------+-----+
|a  |window                                    |min_c|
+---+------------------------------------------+-----+
|x  |[2021-02-20 23:00:00, 2021-02-21 03:00:00]|3    |
|x  |[2021-02-21 00:00:00, 2021-02-21 04:00:00]|2    |
|x  |[2021-02-20 22:00:00, 2021-02-21 02:00:00]|3    |
|x  |[2021-02-21 02:00:00, 2021-02-21 06:00:00]|2    |
|x  |[2021-02-21 01:00:00, 2021-02-21 05:00:00]|2    |
|x  |[2021-02-21 03:00:00, 2021-02-21 07:00:00]|2    |
+---+------------------------------------------+-----+

对于 3 个输入行和 3 个输出行中的每一行,我想要的结果是 return 作为基于 4 小时时间的 window(= 州)的滑动增量,提前一小时每小时一次,每小时触发一次(但是因为这是批处理,所以流式触发应该没有那么重要)。

相反,我得到了上面的结果,基数 > 所需的行数。

编辑

期望的输出:

输入:

x,2021-02-21 01:00:00",3
x,2021-02-21 02:00:00",4
x,2021-02-21 03:00:00",4
x,2021-02-21 04:00:00",1

输出:

x,2021-02-21 01:00:00", NULL // no single previous record to be found in the previous 3 hours (including self)
x,2021-02-21 02:00:00",3 // as we are currently only computing `min` for simplicity (later it should be max - min to see the deltas) within the last 3 hours the value is 3 (coincidentally the previous row)
x,2021-02-21 03:00:00",3 // within 4 hour window 3 is still the smallest
x,2021-02-21 04:00:00",1 // within the previous <= 3 hours (including self) 1 is smallest

恐怕您对 window 表达式的假设是不正确的。根据其文档 here:

def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column

Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05). ...

因此,在您的 4 小时 window 和 1 小时滑动步骤的情况下,将有 6 个桶可以应用聚合:

[2021-02-20 22:00:00, 2021-02-21 02:00:00)  <-- first bucket that contains the earliest b = 2021-02-21 01:00:00
[2021-02-20 23:00:00, 2021-02-21 03:00:00)
[2021-02-21 00:00:00, 2021-02-21 04:00:00)
[2021-02-21 01:00:00, 2021-02-21 05:00:00)
[2021-02-21 02:00:00, 2021-02-21 06:00:00)
[2021-02-21 03:00:00, 2021-02-21 07:00:00) <-- last bucket that contains the latest b = 2021-02-21 03:00:00

我不完全理解 “我不想使用 WINDOW PARTITION BY ORDER BY...” 因为这样可以让你更有效地满足您的要求 为计算为当前小时和前 3 小时状态的每个输入获取一个输出行