SparkSQL滑动window差分计算
Spark SQL sliding window difference computation
如何在不借助 Spark 流的情况下在 Spark 中计算滑动 window?
注意:我不想使用 WINDOW PARTITION BY ORDER BY k ROWS
before/after 当前的,而是使用时间戳。 window
运算符有这样一个模式:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([{"a": "x", "b": "2021-02-21 01:00:00", "c": "3"},
{"a": "x", "b": "2021-02-21 02:00:00", "c": "4"},
{"a": "x", "b": "2021-02-21 03:00:00", "c": "2"}])
hour_interval = str(4) + ' hour'
sliding_window = str(60) + ' minute'
from pyspark.sql.functions import col, min, max, sum, when, lit, window, date_format
import time
df_aggregated_time_window = df.groupBy("a", window("b", windowDuration=hour_interval,
slideDuration=sliding_window, startTime="30 minute")).agg(min("c").alias("min_c"))
df_aggregated_time_window.show(truncate=False)
+---+------------------------------------------+-----+
|a |window |min_c|
+---+------------------------------------------+-----+
|x |[2021-02-20 23:00:00, 2021-02-21 03:00:00]|3 |
|x |[2021-02-21 00:00:00, 2021-02-21 04:00:00]|2 |
|x |[2021-02-20 22:00:00, 2021-02-21 02:00:00]|3 |
|x |[2021-02-21 02:00:00, 2021-02-21 06:00:00]|2 |
|x |[2021-02-21 01:00:00, 2021-02-21 05:00:00]|2 |
|x |[2021-02-21 03:00:00, 2021-02-21 07:00:00]|2 |
+---+------------------------------------------+-----+
对于 3 个输入行和 3 个输出行中的每一行,我想要的结果是 return 作为基于 4 小时时间的 window(= 州)的滑动增量,提前一小时每小时一次,每小时触发一次(但是因为这是批处理,所以流式触发应该没有那么重要)。
相反,我得到了上面的结果,基数 > 所需的行数。
编辑
期望的输出:
输入:
x,2021-02-21 01:00:00",3
x,2021-02-21 02:00:00",4
x,2021-02-21 03:00:00",4
x,2021-02-21 04:00:00",1
输出:
x,2021-02-21 01:00:00", NULL // no single previous record to be found in the previous 3 hours (including self)
x,2021-02-21 02:00:00",3 // as we are currently only computing `min` for simplicity (later it should be max - min to see the deltas) within the last 3 hours the value is 3 (coincidentally the previous row)
x,2021-02-21 03:00:00",3 // within 4 hour window 3 is still the smallest
x,2021-02-21 04:00:00",1 // within the previous <= 3 hours (including self) 1 is smallest
恐怕您对 window
表达式的假设是不正确的。根据其文档 here:
def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
Bucketize rows into one or more time windows given a timestamp specifying column.
Window starts are inclusive but the window ends are exclusive, e.g.
12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).
...
因此,在您的 4 小时 window 和 1 小时滑动步骤的情况下,将有 6 个桶可以应用聚合:
[2021-02-20 22:00:00, 2021-02-21 02:00:00) <-- first bucket that contains the earliest b = 2021-02-21 01:00:00
[2021-02-20 23:00:00, 2021-02-21 03:00:00)
[2021-02-21 00:00:00, 2021-02-21 04:00:00)
[2021-02-21 01:00:00, 2021-02-21 05:00:00)
[2021-02-21 02:00:00, 2021-02-21 06:00:00)
[2021-02-21 03:00:00, 2021-02-21 07:00:00) <-- last bucket that contains the latest b = 2021-02-21 03:00:00
我不完全理解 “我不想使用 WINDOW PARTITION BY ORDER BY...” 因为这样可以让你更有效地满足您的要求 为计算为当前小时和前 3 小时状态的每个输入获取一个输出行。
如何在不借助 Spark 流的情况下在 Spark 中计算滑动 window?
注意:我不想使用 WINDOW PARTITION BY ORDER BY k ROWS
before/after 当前的,而是使用时间戳。 window
运算符有这样一个模式:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([{"a": "x", "b": "2021-02-21 01:00:00", "c": "3"},
{"a": "x", "b": "2021-02-21 02:00:00", "c": "4"},
{"a": "x", "b": "2021-02-21 03:00:00", "c": "2"}])
hour_interval = str(4) + ' hour'
sliding_window = str(60) + ' minute'
from pyspark.sql.functions import col, min, max, sum, when, lit, window, date_format
import time
df_aggregated_time_window = df.groupBy("a", window("b", windowDuration=hour_interval,
slideDuration=sliding_window, startTime="30 minute")).agg(min("c").alias("min_c"))
df_aggregated_time_window.show(truncate=False)
+---+------------------------------------------+-----+
|a |window |min_c|
+---+------------------------------------------+-----+
|x |[2021-02-20 23:00:00, 2021-02-21 03:00:00]|3 |
|x |[2021-02-21 00:00:00, 2021-02-21 04:00:00]|2 |
|x |[2021-02-20 22:00:00, 2021-02-21 02:00:00]|3 |
|x |[2021-02-21 02:00:00, 2021-02-21 06:00:00]|2 |
|x |[2021-02-21 01:00:00, 2021-02-21 05:00:00]|2 |
|x |[2021-02-21 03:00:00, 2021-02-21 07:00:00]|2 |
+---+------------------------------------------+-----+
对于 3 个输入行和 3 个输出行中的每一行,我想要的结果是 return 作为基于 4 小时时间的 window(= 州)的滑动增量,提前一小时每小时一次,每小时触发一次(但是因为这是批处理,所以流式触发应该没有那么重要)。
相反,我得到了上面的结果,基数 > 所需的行数。
编辑
期望的输出:
输入:
x,2021-02-21 01:00:00",3
x,2021-02-21 02:00:00",4
x,2021-02-21 03:00:00",4
x,2021-02-21 04:00:00",1
输出:
x,2021-02-21 01:00:00", NULL // no single previous record to be found in the previous 3 hours (including self)
x,2021-02-21 02:00:00",3 // as we are currently only computing `min` for simplicity (later it should be max - min to see the deltas) within the last 3 hours the value is 3 (coincidentally the previous row)
x,2021-02-21 03:00:00",3 // within 4 hour window 3 is still the smallest
x,2021-02-21 04:00:00",1 // within the previous <= 3 hours (including self) 1 is smallest
恐怕您对 window
表达式的假设是不正确的。根据其文档 here:
def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05). ...
因此,在您的 4 小时 window 和 1 小时滑动步骤的情况下,将有 6 个桶可以应用聚合:
[2021-02-20 22:00:00, 2021-02-21 02:00:00) <-- first bucket that contains the earliest b = 2021-02-21 01:00:00
[2021-02-20 23:00:00, 2021-02-21 03:00:00)
[2021-02-21 00:00:00, 2021-02-21 04:00:00)
[2021-02-21 01:00:00, 2021-02-21 05:00:00)
[2021-02-21 02:00:00, 2021-02-21 06:00:00)
[2021-02-21 03:00:00, 2021-02-21 07:00:00) <-- last bucket that contains the latest b = 2021-02-21 03:00:00
我不完全理解 “我不想使用 WINDOW PARTITION BY ORDER BY...” 因为这样可以让你更有效地满足您的要求 为计算为当前小时和前 3 小时状态的每个输入获取一个输出行。