Spark SQL Window 在两个指定时间边界之间的间隔内 - 在 3 小时和 2 小时之前

Spark SQL Window over interval of between two specified time boundaries - between 3 hours and 2 hours ago

使用两个预定义边界在 Spark SQL 中指定 window 间隔的正确方法是什么?

我正在尝试将 table 的值与 "between 3 hours ago and 2 hours ago" 的 window 相加。

当我运行这个查询时:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;

行得通。我得到了我期望的结果,即落入 2 小时滚动的值的总和 window.

现在,我需要的是滚动 window 不绑定到当前行,而是考虑 3 小时前到 2 小时前的行。 我试过:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;

但是我收到 extraneous input 'hours' expecting {'PRECEDING', 'FOLLOWING'} 错误。

我也试过:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;

但是我得到了不同的错误 scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)

我尝试的第三个选项是:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;

它并没有像我们预期的那样工作:cannot resolve 'RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING' due to data type mismatch

我很难找到间隔类型的文档,因为 this link 说的不够多,而且其他信息有点不完整。至少我发现了什么。

由于范围间隔不起作用,我不得不转向另一种方法。 它是这样的:

  • 准备需要执行计算的间隔列表
  • 对于每个间隔,运行 计算
    • 每次迭代都会产生一个数据框
  • 迭代后,我们有一个数据帧列表
  • 将列表中的数据帧合并为一个更大的数据帧
  • 写出结果

在我的例子中,我不得不 运行 计算一天中的每个小时,并将这些 "hourly" 结果(即 24 个数据帧的列表)合并为一个 "daily" , 数据框.

从非常高层次的角度来看,代码如下所示:

val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield {
    val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart))
    // do stuff
    // return a data frame
}
hourlyDFs.toSeq().reduce(_.union(_))

遇到了同样的问题并找到了一个简单的解决方案。给你:

unix_timestamp(datestamp) - unix_timestamp(datestamp) < 10800 --3 hours in seconds 

您也可以使用时间戳来提高可读性。 (不知道是否需要):

select unix_timestamp(date_format(current_timestamp, 'HH:mm:ss'), 'HH:mm:ss') <
       unix_timestamp('03:00:00', 'HH:mm:ss') --Used timestamp for readibility

获得相同结果的解决方法是计算最近 3 小时内的值的总和,然后减去最近 2 小时内的值的总和:

select *, 
sum(value) over (
     partition by a, b
     order by cast(time_value as timestamp)
     range between interval 3 hours preceding and current row) 
- 
sum(value) over (
     partition by a, b
     order by cast(time_value as timestamp)
     range between interval 2 hours preceding and current row) 
as sum_value
from my_temp_table;