如何循环遍历数据集以创建摘要数据集
How to loop through dataset to create a dataset of summary
我刚刚开始学习和使用Spark,目前遇到一个问题。任何建议或提示将不胜感激。
基本上我有一个数据集,其中包含不同用户的各种事件,如 AppLaunch、GameStart、GameEnd 等,我想创建每个用户每次 he/she 启动时的操作摘要应用
例如:我有以下数据集:
UserId | Event Type | Time | GameType | Event Id|
11111 | AppLauch | 11:01:53| null | 101 |
11111 | GameStart | 11:01:59| Puzzle | 102 |
11111 | GameEnd | 11:05:31| Puzzle | 103 |
11111 | GameStart | 11:05:58| Word | 104 |
11111 | GameEnd | 11:09:13| Word | 105 |
11111 | AppEnd | 11:09:24| null | 106 |
11111 | AppLauch | 12:03:43| null | 107 |
22222 | AppLauch | 12:03:52| null | 108 |
22222 | GameStart | 12:03:59| Puzzle | 109 |
11111 | GameStart | 12:04:01| Puzzle | 110 |
22222 | GameEnd | 12:06:11| Puzzle | 111 |
11111 | GameEnd | 12:06:13| Puzzle | 112 |
11111 | AppEnd | 12:06:23| null | 113 |
22222 | AppEnd | 12:06:33| null | 114 |
而我想要的是与此类似的数据集:
EventId | USerId| Event Type | Time | FirstGamePlayed| LastGamePlayed|
101 |11111 | AppLauch | 11:01:53| Puzzle | Word |
107 |11111 | AppLauch | 12:03:43| Puzzle | Puzzle |
108 |22222 | AppLauch | 12:03:52| Puzzle | Puzzle |
只需要知道玩的第一场和最后一场比赛,即使在一次应用启动中玩了超过 3 场比赛。
我最初的想法是按用户 Id 和 window 时间范围(AppLaunch 到 AppEnd)将它们分组,然后找到一种方法扫描数据集,如果有 gameStart 事件并且它落下进入任何 window,它将是 FirstGamePlayed,AppEnd 时间之前的最后一个 GameStart 事件将是 LastGamePlayed。但是我没有找到实现这个的方法。
任何 hint/suggestion 都很好。
谢谢
我认为这可以使用 window 函数然后像这样进行聚合来解决:
df
// enumerate AppLaunches
.withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
// get first last game per AppLaunch
.withColumn("firstGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".asc)))
.withColumn("lastGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".desc)))
// now aggregate
.groupBy($"AppLauchNr")
.agg(
first($"UserId").as("UserId"),
min($"EventId").as("EventId"),
lit("AppLauch").as("EventType"), // this is always AppLauch
min($"Time").as("Time"),
first($"firstGamePlayed", true).as("firstGamePlayed"),
first($"lastGamePlayed", true).as("lastGamePlayed")
)
.drop($"AppLauchNr")
第一个和最后一个游戏也可以使用 orderBy().groupBy()
而不是 window 函数来确定,但我仍然不确定 spark 在聚合期间保留顺序(这在文档,请参见 and discussion in https://issues.apache.org/jira/browse/SPARK-16207)
df
.withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
.orderBy($"UserId",$"AppLauchNr",$"Time")
.groupBy($"UserId",$"AppLauchNr")
.agg(
first($"EventId").as("EventId"),
first($"EventType").as("EventType"),
first($"Time").as("Time"),
first($"GameType", true).as("firstGamePlayed"),
last($"GameType", true).as("lastGamePlayed")
)
.drop($"AppLauchNr")
我刚刚开始学习和使用Spark,目前遇到一个问题。任何建议或提示将不胜感激。
基本上我有一个数据集,其中包含不同用户的各种事件,如 AppLaunch、GameStart、GameEnd 等,我想创建每个用户每次 he/she 启动时的操作摘要应用
例如:我有以下数据集:
UserId | Event Type | Time | GameType | Event Id|
11111 | AppLauch | 11:01:53| null | 101 |
11111 | GameStart | 11:01:59| Puzzle | 102 |
11111 | GameEnd | 11:05:31| Puzzle | 103 |
11111 | GameStart | 11:05:58| Word | 104 |
11111 | GameEnd | 11:09:13| Word | 105 |
11111 | AppEnd | 11:09:24| null | 106 |
11111 | AppLauch | 12:03:43| null | 107 |
22222 | AppLauch | 12:03:52| null | 108 |
22222 | GameStart | 12:03:59| Puzzle | 109 |
11111 | GameStart | 12:04:01| Puzzle | 110 |
22222 | GameEnd | 12:06:11| Puzzle | 111 |
11111 | GameEnd | 12:06:13| Puzzle | 112 |
11111 | AppEnd | 12:06:23| null | 113 |
22222 | AppEnd | 12:06:33| null | 114 |
而我想要的是与此类似的数据集:
EventId | USerId| Event Type | Time | FirstGamePlayed| LastGamePlayed|
101 |11111 | AppLauch | 11:01:53| Puzzle | Word |
107 |11111 | AppLauch | 12:03:43| Puzzle | Puzzle |
108 |22222 | AppLauch | 12:03:52| Puzzle | Puzzle |
只需要知道玩的第一场和最后一场比赛,即使在一次应用启动中玩了超过 3 场比赛。
我最初的想法是按用户 Id 和 window 时间范围(AppLaunch 到 AppEnd)将它们分组,然后找到一种方法扫描数据集,如果有 gameStart 事件并且它落下进入任何 window,它将是 FirstGamePlayed,AppEnd 时间之前的最后一个 GameStart 事件将是 LastGamePlayed。但是我没有找到实现这个的方法。
任何 hint/suggestion 都很好。
谢谢
我认为这可以使用 window 函数然后像这样进行聚合来解决:
df
// enumerate AppLaunches
.withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
// get first last game per AppLaunch
.withColumn("firstGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".asc)))
.withColumn("lastGamePlayed", first($"GameType", true).over(Window.partitionBy($"UserId", $"AppLauchNr").orderBy($"Time".desc)))
// now aggregate
.groupBy($"AppLauchNr")
.agg(
first($"UserId").as("UserId"),
min($"EventId").as("EventId"),
lit("AppLauch").as("EventType"), // this is always AppLauch
min($"Time").as("Time"),
first($"firstGamePlayed", true).as("firstGamePlayed"),
first($"lastGamePlayed", true).as("lastGamePlayed")
)
.drop($"AppLauchNr")
第一个和最后一个游戏也可以使用 orderBy().groupBy()
而不是 window 函数来确定,但我仍然不确定 spark 在聚合期间保留顺序(这在文档,请参见
df
.withColumn("AppLauchNr", sum(when($"EventType" === "AppLauch", 1)).over(Window.partitionBy($"UserId").orderBy($"Time".asc)))
.orderBy($"UserId",$"AppLauchNr",$"Time")
.groupBy($"UserId",$"AppLauchNr")
.agg(
first($"EventId").as("EventId"),
first($"EventType").as("EventType"),
first($"Time").as("Time"),
first($"GameType", true).as("firstGamePlayed"),
last($"GameType", true).as("lastGamePlayed")
)
.drop($"AppLauchNr")