具有复杂条件的 Spark SQL window 函数
Spark SQL window function with complex condition
这可能是通过示例最容易解释的方法。假设我有一个用户登录网站的 DataFrame,例如:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
我想在此添加一列,以表明他们何时成为网站上的活跃用户。但有一个警告:用户在一段时间内被视为活跃,在此期间之后,如果他们再次登录,他们的 became_active
日期将重置。假设这个周期是5天。那么从上面的 table 导出的所需 table 将是这样的:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
因此,特别是 SirChillingtonIV 的 became_active
日期被重置,因为他们的第二次登录是在活动期到期之后,但 Booooooo99900098 的 became_active
日期在第二次 he/she 登录时未被重置in,因为它属于活跃期。
我最初的想法是使用 window 函数和 lag
,然后使用 lag
ged 值填充 became_active
列;例如,开头大致如下:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
那么,填写 became_active
日期的规则是,如果 tmp
是 null
(即,如果它是第一次登录)或者如果 login_date - tmp >= 5
然后 became_active = login_date
;否则,转到 tmp
中的下一个最新值并应用相同的规则。这表明了一种递归方法,我很难想象一种实现方法。
我的问题:这是一种可行的方法吗?如果可行,我如何才能 "go back" 并查看 tmp
的早期值,直到找到我停止的值?据我所知,我无法遍历 Spark SQL Column
的值。还有另一种方法可以达到这个结果吗?
Spark >= 3.2
最近的 Spark 版本在批处理和结构化流查询中提供了对会话 windows 的原生支持(参见 SPARK-10816 and its sub-tasks, especially SPARK-34893)。
官方文档提供不错usage example.
Spark < 3.2
这是诀窍。导入一堆函数:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
定义windows:
val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")
找到新会话开始的点:
val newSession = (coalesce(
datediff($"login_date", lag($"login_date", 1).over(userWindow)),
lit(0)
) > 5).cast("bigint")
val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
查找每个会话的最早日期:
val result = sessionized
.withColumn("became_active", min($"login_date").over(userSessionWindow))
.drop("session")
数据集定义为:
val df = Seq(
("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")
结果是:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14| 2012-01-11|
|SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
+----------------+----------+-------------+
重构 以与 Pyspark
一起工作
在 Pyspark
中,您可以像下面那样做。
create data frame
df = sqlContext.createDataFrame(
[
("SirChillingtonIV", "2012-01-04"),
("Booooooo99900098", "2012-01-04"),
("Booooooo99900098", "2012-01-06"),
("OprahWinfreyJr", "2012-01-10"),
("SirChillingtonIV", "2012-01-11"),
("SirChillingtonIV", "2012-01-14"),
("SirChillingtonIV", "2012-08-11")
],
("user_name", "login_date"))
上面的代码创建了一个如下所示的数据框
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
|SirChillingtonIV|2012-01-14|
|SirChillingtonIV|2012-08-11|
+----------------+----------+
现在我们要先找出login_date
之间相差超过5
天。
为此请按以下操作。
必要的进口
from pyspark.sql import functions as f
from pyspark.sql import Window
# defining window partitions
login_window = Window.partitionBy("user_name").orderBy("login_date")
session_window = Window.partitionBy("user_name", "session")
session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))
当我们运行上面这行代码如果date_diff
是NULL
那么coalesce
函数会把NULL
替换成0
.
+----------------+----------+-------+
| user_name|login_date|session|
+----------------+----------+-------+
| OprahWinfreyJr|2012-01-10| 0|
|SirChillingtonIV|2012-01-04| 0|
|SirChillingtonIV|2012-01-11| 1|
|SirChillingtonIV|2012-01-14| 1|
|SirChillingtonIV|2012-08-11| 2|
|Booooooo99900098|2012-01-04| 0|
|Booooooo99900098|2012-01-06| 0|
+----------------+----------+-------+
# add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-04| 2012-01-04|
|SirChillingtonIV|2012-01-11| 2012-01-11|
|SirChillingtonIV|2012-01-14| 2012-01-11|
|SirChillingtonIV|2012-08-11| 2012-08-11|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
+----------------+----------+-------------+
这可能是通过示例最容易解释的方法。假设我有一个用户登录网站的 DataFrame,例如:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
我想在此添加一列,以表明他们何时成为网站上的活跃用户。但有一个警告:用户在一段时间内被视为活跃,在此期间之后,如果他们再次登录,他们的 became_active
日期将重置。假设这个周期是5天。那么从上面的 table 导出的所需 table 将是这样的:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
因此,特别是 SirChillingtonIV 的 became_active
日期被重置,因为他们的第二次登录是在活动期到期之后,但 Booooooo99900098 的 became_active
日期在第二次 he/she 登录时未被重置in,因为它属于活跃期。
我最初的想法是使用 window 函数和 lag
,然后使用 lag
ged 值填充 became_active
列;例如,开头大致如下:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
那么,填写 became_active
日期的规则是,如果 tmp
是 null
(即,如果它是第一次登录)或者如果 login_date - tmp >= 5
然后 became_active = login_date
;否则,转到 tmp
中的下一个最新值并应用相同的规则。这表明了一种递归方法,我很难想象一种实现方法。
我的问题:这是一种可行的方法吗?如果可行,我如何才能 "go back" 并查看 tmp
的早期值,直到找到我停止的值?据我所知,我无法遍历 Spark SQL Column
的值。还有另一种方法可以达到这个结果吗?
Spark >= 3.2
最近的 Spark 版本在批处理和结构化流查询中提供了对会话 windows 的原生支持(参见 SPARK-10816 and its sub-tasks, especially SPARK-34893)。
官方文档提供不错usage example.
Spark < 3.2
这是诀窍。导入一堆函数:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
定义windows:
val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")
找到新会话开始的点:
val newSession = (coalesce(
datediff($"login_date", lag($"login_date", 1).over(userWindow)),
lit(0)
) > 5).cast("bigint")
val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
查找每个会话的最早日期:
val result = sessionized
.withColumn("became_active", min($"login_date").over(userSessionWindow))
.drop("session")
数据集定义为:
val df = Seq(
("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")
结果是:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14| 2012-01-11|
|SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
+----------------+----------+-------------+
重构 Pyspark
在 Pyspark
中,您可以像下面那样做。
create data frame
df = sqlContext.createDataFrame(
[
("SirChillingtonIV", "2012-01-04"),
("Booooooo99900098", "2012-01-04"),
("Booooooo99900098", "2012-01-06"),
("OprahWinfreyJr", "2012-01-10"),
("SirChillingtonIV", "2012-01-11"),
("SirChillingtonIV", "2012-01-14"),
("SirChillingtonIV", "2012-08-11")
],
("user_name", "login_date"))
上面的代码创建了一个如下所示的数据框
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
|SirChillingtonIV|2012-01-14|
|SirChillingtonIV|2012-08-11|
+----------------+----------+
现在我们要先找出login_date
之间相差超过5
天。
为此请按以下操作。
必要的进口
from pyspark.sql import functions as f
from pyspark.sql import Window
# defining window partitions
login_window = Window.partitionBy("user_name").orderBy("login_date")
session_window = Window.partitionBy("user_name", "session")
session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))
当我们运行上面这行代码如果date_diff
是NULL
那么coalesce
函数会把NULL
替换成0
.
+----------------+----------+-------+
| user_name|login_date|session|
+----------------+----------+-------+
| OprahWinfreyJr|2012-01-10| 0|
|SirChillingtonIV|2012-01-04| 0|
|SirChillingtonIV|2012-01-11| 1|
|SirChillingtonIV|2012-01-14| 1|
|SirChillingtonIV|2012-08-11| 2|
|Booooooo99900098|2012-01-04| 0|
|Booooooo99900098|2012-01-06| 0|
+----------------+----------+-------+
# add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-04| 2012-01-04|
|SirChillingtonIV|2012-01-11| 2012-01-11|
|SirChillingtonIV|2012-01-14| 2012-01-11|
|SirChillingtonIV|2012-08-11| 2012-08-11|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
+----------------+----------+-------------+