如何在 PySpark 中使用时间戳和用户 ID 创建“sessionId”列?

How to create a “sessionId” column using timestamps and userid in PySpark?

我有一个数据集,其中包含以下字段:userId、event、pageName 和 timestamp,但缺少 sessionId。我想根据时间戳和预定义值“finish”(指示会话在不活动多少分钟后结束)为每个记录创建一个 sessionId。只有具有相同 UserId 的用户才能在同一会话中。

如果“完成”值为 30 分钟(时间戳相差 1800),样本 DataFrame 为:

from pyspark.sql import functions as F 
from pyspark.sql.window import Window

df = spark.createDataFrame([
  ("blue", "view", 1610494094750, 11),
  ("green", "add to bag", 1510593114350, 21),
  ("red", "close", 1610493115350, 41),
  ("blue", "view", 1610494094350, 11),
  ("blue", "close", 1510593114312, 21),
  ("red", "view", 1610493114350, 41),
  ("red", "view", 1610593114350, 41),
  ("green", "purchase", 1610494094350, 31)
], ["item", "event", "timestamp", "userId"])


+-----+----------+-------------+------+
| item|     event|    timestamp|userId|
+-----+----------+-------------+------+
| blue|      view|1610494094750|    11|
|green|add to bag|1510593114350|    21|
|  red|     close|1610493115350|    41|
| blue|      view|1610494094350|    11|
| blue|     close|1510593114312|    21|
|  red|      view|1610493114350|    41|
|  red|      view|1610593114350|    41|
|green|  purchase|1610494094350|    31|
+-----+----------+-------------+------+

最终结果应该是这样的:

+--------+----------+-------------+------+---------+
|    item|     event|    timestamp|userId|sessionId|
+--------+----------+-------------+------+---------+
|    blue|     close|1510593114312|    21| session1|
|   green|add to bag|1510593114350|    21| session1|
|     red|      view|1610493114350|    41| session2|
|    blue|      view|1610494094350|    11| session3|
|     red|     close|1610493115350|    41| session2|
|   green|  purchase|1610494094350|    31| session4|
|    blue|      view|1610494094750|    11| session3|
|     red|      view|1610593114350|    41| session5|
+--------+----------+-------------+------+---------+

我正在尝试使用 PySpark 解决这个问题。欢迎任何建议。

编辑:我已经编辑了时间戳

我认为这应该可以完成工作:

# Get the previous timestamp for each userid
df = df.withColumn(
    "session_id",
    F.lag("timestamp").over(Window.partitionBy("userid").orderBy("timestamp")),
)

# Define if the session is the 1st one (more than 1800s after the previous one)
df = df.withColumn(
    "session_id",
    F.when(F.col("timestamp") - F.col("session_id") <= 1800, 0).otherwise(1),
)

# create a unic id per session (same id can exists for different users)
df = df.withColumn(
    "session_id",
    F.sum("session_id").over(Window.partitionBy("userid").orderBy("timestamp")),
)

# create a unic id per session per user
df = df.withColumn(
    "session_id", F.dense_rank().over(Window.orderBy("userid", "session_id"))
)
df.show()
+-----+----------+-------------+------+----------+                              
| item|     event|    timestamp|userId|session_id|
+-----+----------+-------------+------+----------+
| blue|      view|1610494094350|    11|         1|
| blue|      view|1610494094750|    11|         1|
| blue|     close|1510593114312|    21|         2|
|green|add to bag|1510593114350|    21|         2|
|green|  purchase|1610494094350|    31|         3|
|  red|      view|1610493114350|    41|         4|
|  red|     close|1610493115350|    41|         4|
|  red|      view|1610593114350|    41|         5|
+-----+----------+-------------+------+----------+