Spark Structured Streaming groupby window - 我希望第一个间隔从第一个时间戳开始
Spark Structured Streaming groupby window - I want first interval start on the first time stamp
从一个在 Spark 2.31 (HDP 3.0) 上使用 window 聚合的简单完整示例,我可以看到 Spark 创建与某个整数对齐的间隔。例如,这里我指定 60 秒 windowDuration,Spark 在最近的分钟开始第一个间隔:
>>> from pyspark.sql import functions as F
>>> df = spark.createDataFrame([("2016-03-11 09:00:07", 1),("2016-03-11 09:00:08", 1)]).toDF("date", "val")
>>> w = df.groupBy(F.window("date", "60 seconds")).agg(F.sum("val").alias("sum"))
>>> w.select(w.window.start.cast("string").alias("start"),w.window.end.cast("string").alias("end"), "sum").collect()
[Row(start='2016-03-11 09:00:00', end='2016-03-11 09:01:00', sum=2)]
有没有办法在第一条消息处开始间隔,即在我的情况下,我想要:
[Row(start='2016-03-11 09:00:07', end='2016-03-11 09:01:07', sum=2)]
给你=>
from pyspark.sql import functions as F
from datetime import datetime
df = spark.createDataFrame([("2016-03-11 09:00:07", 1),("2016-03-11 09:00:08", 1)]).toDF("date", "val")
startSecond = datetime.strptime(df.head()[0], '%Y-%m-%d %H:%M:%S').second
w = df.groupBy(F.window("date", "60 seconds", "60 seconds", str(startSecond) + " seconds")).agg(F.sum("val").alias("sum"))
w.select(w.window.start.cast("string").alias("start"),w.window.end.cast("string").alias("end"), "sum").collect()
从一个在 Spark 2.31 (HDP 3.0) 上使用 window 聚合的简单完整示例,我可以看到 Spark 创建与某个整数对齐的间隔。例如,这里我指定 60 秒 windowDuration,Spark 在最近的分钟开始第一个间隔:
>>> from pyspark.sql import functions as F
>>> df = spark.createDataFrame([("2016-03-11 09:00:07", 1),("2016-03-11 09:00:08", 1)]).toDF("date", "val")
>>> w = df.groupBy(F.window("date", "60 seconds")).agg(F.sum("val").alias("sum"))
>>> w.select(w.window.start.cast("string").alias("start"),w.window.end.cast("string").alias("end"), "sum").collect()
[Row(start='2016-03-11 09:00:00', end='2016-03-11 09:01:00', sum=2)]
有没有办法在第一条消息处开始间隔,即在我的情况下,我想要:
[Row(start='2016-03-11 09:00:07', end='2016-03-11 09:01:07', sum=2)]
给你=>
from pyspark.sql import functions as F
from datetime import datetime
df = spark.createDataFrame([("2016-03-11 09:00:07", 1),("2016-03-11 09:00:08", 1)]).toDF("date", "val")
startSecond = datetime.strptime(df.head()[0], '%Y-%m-%d %H:%M:%S').second
w = df.groupBy(F.window("date", "60 seconds", "60 seconds", str(startSecond) + " seconds")).agg(F.sum("val").alias("sum"))
w.select(w.window.start.cast("string").alias("start"),w.window.end.cast("string").alias("end"), "sum").collect()