如何在 PySpark 中使用时间戳和用户 ID 创建“sessionId”列?
How to create a “sessionId” column using timestamps and userid in PySpark?
我有一个数据集,其中包含以下字段:userId、event、pageName 和 timestamp,但缺少 sessionId。我想根据时间戳和预定义值“finish”(指示会话在不活动多少分钟后结束)为每个记录创建一个 sessionId。只有具有相同 UserId 的用户才能在同一会话中。
如果“完成”值为 30 分钟(时间戳相差 1800),样本 DataFrame 为:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.createDataFrame([
("blue", "view", 1610494094750, 11),
("green", "add to bag", 1510593114350, 21),
("red", "close", 1610493115350, 41),
("blue", "view", 1610494094350, 11),
("blue", "close", 1510593114312, 21),
("red", "view", 1610493114350, 41),
("red", "view", 1610593114350, 41),
("green", "purchase", 1610494094350, 31)
], ["item", "event", "timestamp", "userId"])
+-----+----------+-------------+------+
| item| event| timestamp|userId|
+-----+----------+-------------+------+
| blue| view|1610494094750| 11|
|green|add to bag|1510593114350| 21|
| red| close|1610493115350| 41|
| blue| view|1610494094350| 11|
| blue| close|1510593114312| 21|
| red| view|1610493114350| 41|
| red| view|1610593114350| 41|
|green| purchase|1610494094350| 31|
+-----+----------+-------------+------+
最终结果应该是这样的:
+--------+----------+-------------+------+---------+
| item| event| timestamp|userId|sessionId|
+--------+----------+-------------+------+---------+
| blue| close|1510593114312| 21| session1|
| green|add to bag|1510593114350| 21| session1|
| red| view|1610493114350| 41| session2|
| blue| view|1610494094350| 11| session3|
| red| close|1610493115350| 41| session2|
| green| purchase|1610494094350| 31| session4|
| blue| view|1610494094750| 11| session3|
| red| view|1610593114350| 41| session5|
+--------+----------+-------------+------+---------+
我正在尝试使用 PySpark 解决这个问题。欢迎任何建议。
编辑:我已经编辑了时间戳
我认为这应该可以完成工作:
# Get the previous timestamp for each userid
df = df.withColumn(
"session_id",
F.lag("timestamp").over(Window.partitionBy("userid").orderBy("timestamp")),
)
# Define if the session is the 1st one (more than 1800s after the previous one)
df = df.withColumn(
"session_id",
F.when(F.col("timestamp") - F.col("session_id") <= 1800, 0).otherwise(1),
)
# create a unic id per session (same id can exists for different users)
df = df.withColumn(
"session_id",
F.sum("session_id").over(Window.partitionBy("userid").orderBy("timestamp")),
)
# create a unic id per session per user
df = df.withColumn(
"session_id", F.dense_rank().over(Window.orderBy("userid", "session_id"))
)
df.show()
+-----+----------+-------------+------+----------+
| item| event| timestamp|userId|session_id|
+-----+----------+-------------+------+----------+
| blue| view|1610494094350| 11| 1|
| blue| view|1610494094750| 11| 1|
| blue| close|1510593114312| 21| 2|
|green|add to bag|1510593114350| 21| 2|
|green| purchase|1610494094350| 31| 3|
| red| view|1610493114350| 41| 4|
| red| close|1610493115350| 41| 4|
| red| view|1610593114350| 41| 5|
+-----+----------+-------------+------+----------+
我有一个数据集,其中包含以下字段:userId、event、pageName 和 timestamp,但缺少 sessionId。我想根据时间戳和预定义值“finish”(指示会话在不活动多少分钟后结束)为每个记录创建一个 sessionId。只有具有相同 UserId 的用户才能在同一会话中。
如果“完成”值为 30 分钟(时间戳相差 1800),样本 DataFrame 为:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.createDataFrame([
("blue", "view", 1610494094750, 11),
("green", "add to bag", 1510593114350, 21),
("red", "close", 1610493115350, 41),
("blue", "view", 1610494094350, 11),
("blue", "close", 1510593114312, 21),
("red", "view", 1610493114350, 41),
("red", "view", 1610593114350, 41),
("green", "purchase", 1610494094350, 31)
], ["item", "event", "timestamp", "userId"])
+-----+----------+-------------+------+
| item| event| timestamp|userId|
+-----+----------+-------------+------+
| blue| view|1610494094750| 11|
|green|add to bag|1510593114350| 21|
| red| close|1610493115350| 41|
| blue| view|1610494094350| 11|
| blue| close|1510593114312| 21|
| red| view|1610493114350| 41|
| red| view|1610593114350| 41|
|green| purchase|1610494094350| 31|
+-----+----------+-------------+------+
最终结果应该是这样的:
+--------+----------+-------------+------+---------+
| item| event| timestamp|userId|sessionId|
+--------+----------+-------------+------+---------+
| blue| close|1510593114312| 21| session1|
| green|add to bag|1510593114350| 21| session1|
| red| view|1610493114350| 41| session2|
| blue| view|1610494094350| 11| session3|
| red| close|1610493115350| 41| session2|
| green| purchase|1610494094350| 31| session4|
| blue| view|1610494094750| 11| session3|
| red| view|1610593114350| 41| session5|
+--------+----------+-------------+------+---------+
我正在尝试使用 PySpark 解决这个问题。欢迎任何建议。
编辑:我已经编辑了时间戳
我认为这应该可以完成工作:
# Get the previous timestamp for each userid
df = df.withColumn(
"session_id",
F.lag("timestamp").over(Window.partitionBy("userid").orderBy("timestamp")),
)
# Define if the session is the 1st one (more than 1800s after the previous one)
df = df.withColumn(
"session_id",
F.when(F.col("timestamp") - F.col("session_id") <= 1800, 0).otherwise(1),
)
# create a unic id per session (same id can exists for different users)
df = df.withColumn(
"session_id",
F.sum("session_id").over(Window.partitionBy("userid").orderBy("timestamp")),
)
# create a unic id per session per user
df = df.withColumn(
"session_id", F.dense_rank().over(Window.orderBy("userid", "session_id"))
)
df.show()
+-----+----------+-------------+------+----------+
| item| event| timestamp|userId|session_id|
+-----+----------+-------------+------+----------+
| blue| view|1610494094350| 11| 1|
| blue| view|1610494094750| 11| 1|
| blue| close|1510593114312| 21| 2|
|green|add to bag|1510593114350| 21| 2|
|green| purchase|1610494094350| 31| 3|
| red| view|1610493114350| 41| 4|
| red| close|1610493115350| 41| 4|
| red| view|1610593114350| 41| 5|
+-----+----------+-------------+------+----------+