Spark 1.5.2:在时间范围内对 DataFrame 行进行分组
Spark 1.5.2: Grouping DataFrame Rows over a Time Range
我有一个具有以下架构的 df
:
ts: TimestampType
key: int
val: int
df
按ts
升序排列。从 row(0) 开始,我想在特定时间间隔内对数据帧进行分组。
例如,如果我说 df.filter(row(0).ts + expr(INTERVAL 24 HOUR)).collect()
,它应该 return 24 小时内的所有行 window 行 (0)。
有没有办法在 Spark DF 上下文中实现上述目标?
总的来说是比较简单的任务。您所需要的只是 UNIX 时间戳的基本算术。首先让我们将所有时间戳转换为数字:
val dfNum = df.withColumn("ts", $"timestamp".cast("long"))
接下来让我们找到所有行的最小时间戳:
val offset = dfNum.agg(min($"ts")).first.getLong(0)
并用它来计算组:
val aDay = lit(60 * 60 * 24)
val group = (($"ts" - lit(offset)) / aDay).cast("long")
val dfWithGroups = dfNum.withColumn("group", group)
您终于可以将其用作分组列了:
dfWithGroups.groupBy($"group").agg(min($"value")).
如果您想要有意义的间隔(可解释为时间戳),只需将组乘以 aDay
。
显然这不会处理复杂的情况,例如处理夏令时或闰秒,但在大多数情况下应该足够好。如果您需要正确处理其中任何一个,您可以使用类似的逻辑,将 Joda 时间与 UDF 结合使用。
我有一个具有以下架构的 df
:
ts: TimestampType
key: int
val: int
df
按ts
升序排列。从 row(0) 开始,我想在特定时间间隔内对数据帧进行分组。
例如,如果我说 df.filter(row(0).ts + expr(INTERVAL 24 HOUR)).collect()
,它应该 return 24 小时内的所有行 window 行 (0)。
有没有办法在 Spark DF 上下文中实现上述目标?
总的来说是比较简单的任务。您所需要的只是 UNIX 时间戳的基本算术。首先让我们将所有时间戳转换为数字:
val dfNum = df.withColumn("ts", $"timestamp".cast("long"))
接下来让我们找到所有行的最小时间戳:
val offset = dfNum.agg(min($"ts")).first.getLong(0)
并用它来计算组:
val aDay = lit(60 * 60 * 24)
val group = (($"ts" - lit(offset)) / aDay).cast("long")
val dfWithGroups = dfNum.withColumn("group", group)
您终于可以将其用作分组列了:
dfWithGroups.groupBy($"group").agg(min($"value")).
如果您想要有意义的间隔(可解释为时间戳),只需将组乘以 aDay
。
显然这不会处理复杂的情况,例如处理夏令时或闰秒,但在大多数情况下应该足够好。如果您需要正确处理其中任何一个,您可以使用类似的逻辑,将 Joda 时间与 UDF 结合使用。