从具有不同值的第一行获取值

Get value from first lead row that has a different value

我有一个 ID 列表、一个消息序列号 (seq) 和一个值(例如时间戳)。多行可以有相同的序号。每行中还有一些其他列具有不同的值,但我将它们排除在外,因为它们并不重要。

在来自 deviceId (=partitionBy) 的所有消息中,我需要按 sequence_number (=orderBy) 排序并添加 'ts' 下一条消息的 'ts' 值与当前 sequence_number.

的所有消息不同的 sequence_number

如果该行具有不同的序列号,我会检索下一行的值。但是由于“具有不同序列号的下一行”可能是 x 行很远,所以我必须为前面的 x 行添加特定的 .when(condition, ...) 块。

我想知道是否有更好的解决方案,无论具有不同序列号的下一行有多“远”,它都有效。我尝试了 .otherwise(lead(col("next_value"), 1),但因为我只是在构建专栏,所以它不起作用。

我的代码和可重现的例子:

data = [
    (1, 1, "A"),
    (2, 1, "G"),
    (2, 2, "F"),
    (3, 1, "A"),
    (4, 1, "A"),
    (4, 2, "B"),
    (4, 3, "C"),
    (4, 3, "C"),
    (4, 3, "C"),
    (4, 4, "D")
]

df = spark.createDataFrame(data=data, schema=["id", "seq", "ts"])

df.printSchema()
df.show(10, False)


window = Window \
    .orderBy("id", "seq") \
    .partitionBy("id")
# I could potentially do this 100x if the next lead-value is 100 rows away, but I wonder if there isn't a better solution.
is_different_seq1 = lead(col("seq"), 1).over(window) != col("seq")
is_different_seq2 = lead(col("seq"), 2).over(window) != col("seq")

df = df.withColumn("lead_value",
                   when(is_different_seq1,
                        lead(col("ts"), 1).over(window)
                        )
                   .when(is_different_seq2,
                        lead(col("ts"), 2).over(window)
                    )

                   )

df.printSchema()
df.show(10, False)

id=4 的“next_value”列中的理想输出:

id seq ts next_value
4 1 A B
4 2 B C
4 3 C D
4 3 C D
4 3 C D
4 4 D Null

我找到了解决方案(但是非常慢),所以如果有人提出更好的解决方案,请添加您的答案!

我为每条“消息”获取一行,并在其中执行 lead(1),然后将其连接回数据框与其余列。

df_filtered = df.select("id", "seq", "ts").distinct()
df_filtered = df_filtered.withColumn("lead_value", lead(col("ts"), 1).over(window))
df = df.join(df_filtered, on=["id", "seq", "ts"])

我还没有尝试过更复杂的情况,所以这可能还需要更多的调整,但我认为你可以结合 last 功能。

仅使用 lead 函数,结果如下。

id seq ts lead_value
4 1 A B
4 2 B C
4 3 C C
4 3 C C
4 3 C D
4 4 D Null

您想将第 3 行和第 4 行的 lead_value 覆盖为“D”,这是同一 id&[=16= 中 lead_value 的最后一个值]组。

lead_window = (Window
    .partitionBy("deviceId")
    .orderBy("seq"))

last_window = (Window
    .partitionBy('deviceId', 'seq')
    .rowsBetween(0, Window.unboundedFollowing)) 

df = df.withColumn('next_value', F.last(
        F.lead(F.col('ts')).over(lead_window)
    ).over(last_window))

结果。

id seq ts next_value
4 1 A B
4 2 B C
4 3 C D
4 3 C D
4 3 C D
4 4 D Null