如何使用 Spark DataFrame 将最后一个非空值转移到后续行
how to carry over last non empty value to subsequent rows using Spark DataFrame
我有一个像这样的稀疏数据集:
ip,ts,session
"123","1","s1"
"123","2",""
"123","3",""
"123","4",""
"123","10","s2"
"123","11",""
"123","12",""
"222","5","s6"
"222","6",""
"222","7",""
我需要像这样让它变得密集:
ip,ts,session
"123","1","s1"
"123","2","s1"
"123","3","s1"
"123","4","s1"
"123","10","s2"
"123","11","s2"
"123","12","s2"
"222","5","s6"
"222","6","s6"
"222","7","s6"
我知道如何使用 RDD - 通过 ip 和在 partitionMap groupBy(ip).sortBy(ts).scan()() 内重新分区:扫描函数会将先前计算的值转移到下一次迭代并决定使用先前值或保持当前值并将新选择传递给下一个 "scan" 迭代
现在我尝试只使用 DataFrame,而不返回到 RDD。
我正在查看 Window 函数,但我能想到的只是组内的第一个值,这是不一样的。或者我只是不明白如何创建正确的范围。
您可以使用多个自联接来完成。基本上,您想创建一个包含所有 "start session" 记录 (filter($"session" !== "")
) 的数据集,然后将其与原始数据集连接起来,过滤掉 "session start" 晚于当前会话 (filter($"ts" >= $"r_ts")
)。然后你想找出每个 ip
的 max($"r_ts")
。最后一个连接只是从原始数据集中检索 session
值。
data.join(
data.filter($"session" !== "").select(
$"ip" as "r_ip", $"session" as "r_session", $"ts" as "r_ts"
),
$"ip" === $"r_ip"
)
.filter($"ts" >= $"r_ts")
.groupBy($"ip",$"ts")
.agg(max($"r_ts") as "r_ts")
.join(
data.select($"session",$"ts" as "l_ts"),
$"r_ts" === $"l_ts"
)
.select($"ip",$"ts",$"session")
顺便说一句,我的解决方案假定列 ts
类似于事务序列——它是一个递增的 Int 值。如果不是,您可以使用我的 创建一个具有相同目的的专栏。
我的最终代码重用了 David Griffin 的想法:dataWithSessionSparse 是我的问题中描述的起始数据集
val denseSessRecordsOnly = dataWithSessionSparse
.filter(col("sessionId") !== "")
.select(col("ip").alias("r_ip"), col("sessionId").alias("r_sessionId"), col("ts").alias("r_ts")) // isolates first records for all sessions
val dataWithSessionDense = dataWithSessionSparse
.join(denseSessRecordsOnly, col("ip") === col("r_ip")) // explodes each event to relate to all sessions within ip
.filter(col("ts") >= col("r_ts")) //flters out exploded dataset to have each event to be related to sessions prior or at the time of event
.groupBy(col("ip"),col("ts")).agg(max(col("r_ts")).alias("r_ts")) //takes sessionId with max ts.
.join(
denseSessRecordsOnly.select(col("r_ip").alias("l_ip"),col("r_sessionId").alias("sessionId"),col("r_ts").alias("l_ts")),
col("r_ts") === col("l_ts") && col("ip")===col("l_ip"))
.select(col("ip"),col("ts"),col("sessionId"))
我有一个像这样的稀疏数据集:
ip,ts,session
"123","1","s1"
"123","2",""
"123","3",""
"123","4",""
"123","10","s2"
"123","11",""
"123","12",""
"222","5","s6"
"222","6",""
"222","7",""
我需要像这样让它变得密集:
ip,ts,session
"123","1","s1"
"123","2","s1"
"123","3","s1"
"123","4","s1"
"123","10","s2"
"123","11","s2"
"123","12","s2"
"222","5","s6"
"222","6","s6"
"222","7","s6"
我知道如何使用 RDD - 通过 ip 和在 partitionMap groupBy(ip).sortBy(ts).scan()() 内重新分区:扫描函数会将先前计算的值转移到下一次迭代并决定使用先前值或保持当前值并将新选择传递给下一个 "scan" 迭代
现在我尝试只使用 DataFrame,而不返回到 RDD。 我正在查看 Window 函数,但我能想到的只是组内的第一个值,这是不一样的。或者我只是不明白如何创建正确的范围。
您可以使用多个自联接来完成。基本上,您想创建一个包含所有 "start session" 记录 (filter($"session" !== "")
) 的数据集,然后将其与原始数据集连接起来,过滤掉 "session start" 晚于当前会话 (filter($"ts" >= $"r_ts")
)。然后你想找出每个 ip
的 max($"r_ts")
。最后一个连接只是从原始数据集中检索 session
值。
data.join(
data.filter($"session" !== "").select(
$"ip" as "r_ip", $"session" as "r_session", $"ts" as "r_ts"
),
$"ip" === $"r_ip"
)
.filter($"ts" >= $"r_ts")
.groupBy($"ip",$"ts")
.agg(max($"r_ts") as "r_ts")
.join(
data.select($"session",$"ts" as "l_ts"),
$"r_ts" === $"l_ts"
)
.select($"ip",$"ts",$"session")
顺便说一句,我的解决方案假定列 ts
类似于事务序列——它是一个递增的 Int 值。如果不是,您可以使用我的
我的最终代码重用了 David Griffin 的想法:dataWithSessionSparse 是我的问题中描述的起始数据集
val denseSessRecordsOnly = dataWithSessionSparse
.filter(col("sessionId") !== "")
.select(col("ip").alias("r_ip"), col("sessionId").alias("r_sessionId"), col("ts").alias("r_ts")) // isolates first records for all sessions
val dataWithSessionDense = dataWithSessionSparse
.join(denseSessRecordsOnly, col("ip") === col("r_ip")) // explodes each event to relate to all sessions within ip
.filter(col("ts") >= col("r_ts")) //flters out exploded dataset to have each event to be related to sessions prior or at the time of event
.groupBy(col("ip"),col("ts")).agg(max(col("r_ts")).alias("r_ts")) //takes sessionId with max ts.
.join(
denseSessRecordsOnly.select(col("r_ip").alias("l_ip"),col("r_sessionId").alias("sessionId"),col("r_ts").alias("l_ts")),
col("r_ts") === col("l_ts") && col("ip")===col("l_ip"))
.select(col("ip"),col("ts"),col("sessionId"))