从具有不同值的第一行获取值
Get value from first lead row that has a different value
我有一个 ID 列表、一个消息序列号 (seq) 和一个值(例如时间戳)。多行可以有相同的序号。每行中还有一些其他列具有不同的值,但我将它们排除在外,因为它们并不重要。
在来自 deviceId (=partitionBy) 的所有消息中,我需要按 sequence_number (=orderBy) 排序并添加 'ts' 下一条消息的 'ts' 值与当前 sequence_number.
的所有消息不同的 sequence_number
如果该行具有不同的序列号,我会检索下一行的值。但是由于“具有不同序列号的下一行”可能是 x
行很远,所以我必须为前面的 x
行添加特定的 .when(condition, ...)
块。
我想知道是否有更好的解决方案,无论具有不同序列号的下一行有多“远”,它都有效。我尝试了 .otherwise(lead(col("next_value"), 1)
,但因为我只是在构建专栏,所以它不起作用。
我的代码和可重现的例子:
data = [
(1, 1, "A"),
(2, 1, "G"),
(2, 2, "F"),
(3, 1, "A"),
(4, 1, "A"),
(4, 2, "B"),
(4, 3, "C"),
(4, 3, "C"),
(4, 3, "C"),
(4, 4, "D")
]
df = spark.createDataFrame(data=data, schema=["id", "seq", "ts"])
df.printSchema()
df.show(10, False)
window = Window \
.orderBy("id", "seq") \
.partitionBy("id")
# I could potentially do this 100x if the next lead-value is 100 rows away, but I wonder if there isn't a better solution.
is_different_seq1 = lead(col("seq"), 1).over(window) != col("seq")
is_different_seq2 = lead(col("seq"), 2).over(window) != col("seq")
df = df.withColumn("lead_value",
when(is_different_seq1,
lead(col("ts"), 1).over(window)
)
.when(is_different_seq2,
lead(col("ts"), 2).over(window)
)
)
df.printSchema()
df.show(10, False)
id=4
的“next_value”列中的理想输出:
id
seq
ts
next_value
4
1
A
B
4
2
B
C
4
3
C
D
4
3
C
D
4
3
C
D
4
4
D
Null
我找到了解决方案(但是非常慢),所以如果有人提出更好的解决方案,请添加您的答案!
我为每条“消息”获取一行,并在其中执行 lead(1),然后将其连接回数据框与其余列。
df_filtered = df.select("id", "seq", "ts").distinct()
df_filtered = df_filtered.withColumn("lead_value", lead(col("ts"), 1).over(window))
df = df.join(df_filtered, on=["id", "seq", "ts"])
我还没有尝试过更复杂的情况,所以这可能还需要更多的调整,但我认为你可以结合 last
功能。
仅使用 lead
函数,结果如下。
id
seq
ts
lead_value
4
1
A
B
4
2
B
C
4
3
C
C
4
3
C
C
4
3
C
D
4
4
D
Null
您想将第 3 行和第 4 行的 lead_value
覆盖为“D”,这是同一 id
&[=16= 中 lead_value
的最后一个值]组。
lead_window = (Window
.partitionBy("deviceId")
.orderBy("seq"))
last_window = (Window
.partitionBy('deviceId', 'seq')
.rowsBetween(0, Window.unboundedFollowing))
df = df.withColumn('next_value', F.last(
F.lead(F.col('ts')).over(lead_window)
).over(last_window))
结果。
id
seq
ts
next_value
4
1
A
B
4
2
B
C
4
3
C
D
4
3
C
D
4
3
C
D
4
4
D
Null
我有一个 ID 列表、一个消息序列号 (seq) 和一个值(例如时间戳)。多行可以有相同的序号。每行中还有一些其他列具有不同的值,但我将它们排除在外,因为它们并不重要。
在来自 deviceId (=partitionBy) 的所有消息中,我需要按 sequence_number (=orderBy) 排序并添加 'ts' 下一条消息的 'ts' 值与当前 sequence_number.
的所有消息不同的 sequence_number如果该行具有不同的序列号,我会检索下一行的值。但是由于“具有不同序列号的下一行”可能是 x
行很远,所以我必须为前面的 x
行添加特定的 .when(condition, ...)
块。
我想知道是否有更好的解决方案,无论具有不同序列号的下一行有多“远”,它都有效。我尝试了 .otherwise(lead(col("next_value"), 1)
,但因为我只是在构建专栏,所以它不起作用。
我的代码和可重现的例子:
data = [
(1, 1, "A"),
(2, 1, "G"),
(2, 2, "F"),
(3, 1, "A"),
(4, 1, "A"),
(4, 2, "B"),
(4, 3, "C"),
(4, 3, "C"),
(4, 3, "C"),
(4, 4, "D")
]
df = spark.createDataFrame(data=data, schema=["id", "seq", "ts"])
df.printSchema()
df.show(10, False)
window = Window \
.orderBy("id", "seq") \
.partitionBy("id")
# I could potentially do this 100x if the next lead-value is 100 rows away, but I wonder if there isn't a better solution.
is_different_seq1 = lead(col("seq"), 1).over(window) != col("seq")
is_different_seq2 = lead(col("seq"), 2).over(window) != col("seq")
df = df.withColumn("lead_value",
when(is_different_seq1,
lead(col("ts"), 1).over(window)
)
.when(is_different_seq2,
lead(col("ts"), 2).over(window)
)
)
df.printSchema()
df.show(10, False)
id=4
的“next_value”列中的理想输出:
id | seq | ts | next_value |
---|---|---|---|
4 | 1 | A | B |
4 | 2 | B | C |
4 | 3 | C | D |
4 | 3 | C | D |
4 | 3 | C | D |
4 | 4 | D | Null |
我找到了解决方案(但是非常慢),所以如果有人提出更好的解决方案,请添加您的答案!
我为每条“消息”获取一行,并在其中执行 lead(1),然后将其连接回数据框与其余列。
df_filtered = df.select("id", "seq", "ts").distinct()
df_filtered = df_filtered.withColumn("lead_value", lead(col("ts"), 1).over(window))
df = df.join(df_filtered, on=["id", "seq", "ts"])
我还没有尝试过更复杂的情况,所以这可能还需要更多的调整,但我认为你可以结合 last
功能。
仅使用 lead
函数,结果如下。
id | seq | ts | lead_value |
---|---|---|---|
4 | 1 | A | B |
4 | 2 | B | C |
4 | 3 | C | C |
4 | 3 | C | C |
4 | 3 | C | D |
4 | 4 | D | Null |
您想将第 3 行和第 4 行的 lead_value
覆盖为“D”,这是同一 id
&[=16= 中 lead_value
的最后一个值]组。
lead_window = (Window
.partitionBy("deviceId")
.orderBy("seq"))
last_window = (Window
.partitionBy('deviceId', 'seq')
.rowsBetween(0, Window.unboundedFollowing))
df = df.withColumn('next_value', F.last(
F.lead(F.col('ts')).over(lead_window)
).over(last_window))
结果。
id | seq | ts | next_value |
---|---|---|---|
4 | 1 | A | B |
4 | 2 | B | C |
4 | 3 | C | D |
4 | 3 | C | D |
4 | 3 | C | D |
4 | 4 | D | Null |