Spark Window 函数 - rangeBetween 日期
Spark Window Functions - rangeBetween dates
我有一个带有数据的 Spark SQL DataFrame
,我想要获取的是给定日期范围内当前行之前的所有行。因此,例如,我想要从给定行之前的 7 天开始的所有行。我发现我需要使用 Window Function
,例如:
Window \
.partitionBy('id') \
.orderBy('start')
问题来了。我想要 rangeBetween
7 天,但我在 Spark 文档中找不到任何相关内容。 Spark 甚至提供这样的选项吗?现在我只是得到所有前面的行:
.rowsBetween(-sys.maxsize, 0)
但想实现类似的目标:
.rangeBetween("7 days", 0)
如果有人能在这方面帮助我,我将不胜感激。提前致谢!
火花 >= 2.3
从 Spark 2.3 开始,可以使用 SQL API 来使用区间对象,但是 DataFrame
API 支持 still work in progress.
df.createOrReplaceTempView("df")
spark.sql(
"""SELECT *, mean(some_value) OVER (
PARTITION BY id
ORDER BY CAST(start AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS mean FROM df""").show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
Spark < 2.3
据我所知,无论是在 Spark 还是 Hive 中都不可能直接实现。两者都要求与 RANGE
一起使用的 ORDER BY
子句是数字。我发现最接近的是转换为时间戳并按秒操作。假设 start
列包含 date
类型:
from pyspark.sql import Row
row = Row("id", "start", "some_value")
df = sc.parallelize([
row(1, "2015-01-01", 20.0),
row(1, "2015-01-06", 10.0),
row(1, "2015-01-07", 25.0),
row(1, "2015-01-12", 30.0),
row(2, "2015-01-01", 5.0),
row(2, "2015-01-03", 30.0),
row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))
一个小帮手和window定义:
from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col
# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400
最后查询:
w = (Window()
.partitionBy(col("id"))
.orderBy(col("start").cast("timestamp").cast("long"))
.rangeBetween(-days(7), 0))
df.select(col("*"), mean("some_value").over(w).alias("mean")).show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
远非漂亮,但有效。
很棒的解决方案@zero323,如果你想像我那样用分钟而不是天来操作,而且你不需要用 id 进行分区,所以你只需要如我所示修改代码的一部分:
df.createOrReplaceTempView("df")
spark.sql(
"""SELECT *, sum(total) OVER (
ORDER BY CAST(reading_date AS timestamp)
RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
) AS sum_total FROM df""").show()
我有一个带有数据的 Spark SQL DataFrame
,我想要获取的是给定日期范围内当前行之前的所有行。因此,例如,我想要从给定行之前的 7 天开始的所有行。我发现我需要使用 Window Function
,例如:
Window \
.partitionBy('id') \
.orderBy('start')
问题来了。我想要 rangeBetween
7 天,但我在 Spark 文档中找不到任何相关内容。 Spark 甚至提供这样的选项吗?现在我只是得到所有前面的行:
.rowsBetween(-sys.maxsize, 0)
但想实现类似的目标:
.rangeBetween("7 days", 0)
如果有人能在这方面帮助我,我将不胜感激。提前致谢!
火花 >= 2.3
从 Spark 2.3 开始,可以使用 SQL API 来使用区间对象,但是 DataFrame
API 支持 still work in progress.
df.createOrReplaceTempView("df")
spark.sql(
"""SELECT *, mean(some_value) OVER (
PARTITION BY id
ORDER BY CAST(start AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS mean FROM df""").show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
Spark < 2.3
据我所知,无论是在 Spark 还是 Hive 中都不可能直接实现。两者都要求与 RANGE
一起使用的 ORDER BY
子句是数字。我发现最接近的是转换为时间戳并按秒操作。假设 start
列包含 date
类型:
from pyspark.sql import Row
row = Row("id", "start", "some_value")
df = sc.parallelize([
row(1, "2015-01-01", 20.0),
row(1, "2015-01-06", 10.0),
row(1, "2015-01-07", 25.0),
row(1, "2015-01-12", 30.0),
row(2, "2015-01-01", 5.0),
row(2, "2015-01-03", 30.0),
row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))
一个小帮手和window定义:
from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col
# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400
最后查询:
w = (Window()
.partitionBy(col("id"))
.orderBy(col("start").cast("timestamp").cast("long"))
.rangeBetween(-days(7), 0))
df.select(col("*"), mean("some_value").over(w).alias("mean")).show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
远非漂亮,但有效。
很棒的解决方案@zero323,如果你想像我那样用分钟而不是天来操作,而且你不需要用 id 进行分区,所以你只需要如我所示修改代码的一部分:
df.createOrReplaceTempView("df")
spark.sql(
"""SELECT *, sum(total) OVER (
ORDER BY CAST(reading_date AS timestamp)
RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
) AS sum_total FROM df""").show()