Pyspark Window 顺序
Pyspark Window orderBy
我有一个看起来像
的数据框
+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| one| a|
| 1| 1| two| t|
| 1| 2| three| a|
| 1| 2| four| t|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| eight| t|
| 1| 9| nine| a|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
如果我通过在 group_id 上对其进行分区并按 id 对其进行排序来执行 Window 操作,那么 orderby 是否会确保已经排序(排序)的行保持相同的顺序?
例如
window_spec = Window.partitionBy(df.group_id).orderBy(df.id)
df = df.withColumn("row_number", row_number().over(window_spec))
永远是
+--------+---+------+----+------+
|group_id| id| text|type|row_number|
+--------+---+------+----+------+
| 1| 1| one| a| 1|
| 1| 1| two| t| 2|
| 1| 2| three| a| 3|
| 1| 2| four| t| 4|
| 1| 5| five| a| 5|
| 1| 6| six| t| 6|
| 1| 7| seven| a| 7|
| 1| 9| eight| t| 8|
| 1| 9| nine| a| 9|
| 1| 10| ten| t| 10|
| 1| 11|eleven| a| 11|
+--------+---+------+----+------+
简而言之,我的问题是,spark Window 的 orderBy 如何处理已经排序(排序)的行?我的假设是它是稳定的,即它不会改变已经排序的行的顺序,但我在文档中找不到与此相关的任何内容。我怎样才能确保我的假设是正确的?
谢谢。
首先,为那些可能不知道稳定排序定义的读者建立上下文,我将引用此Whosebug answer by Joey Adams
"A sorting algorithm is said to be stable if two objects with equal
keys appear in the same order in sorted output as they appear in the
input array to be sorted" - Joey Adams
现在,spark 中的一个 window 函数可以被认为是 Spark 处理整个集合的 mini-DataFrame,其中每个 mini-DataFrame 都是在指定的键上创建的 - "group_id" 在这个案件。
也就是说,如果提供的数据帧有 "group_id"=2,我们最终会得到两个 Windows,其中第一个只包含 "group_id"=1 的数据,另一个"group_id"=2.
注意这一点很重要,因为我们可以测试 .orderBy() 调用对示例数据帧的影响,而不必真正担心 Window 发生了什么。强调正在发生的事情:
- 数据按指定键分区
- 然后将转换应用于在每个 window
中创建的 'mini-DataFrames'
因此,对于预先排序的输入,例如:
df = spark.createDataFrame(
[
{'group_id': 1, 'id': 1, 'text': 'one', 'type': 'a'},
{'group_id': 1, 'id': 1, 'text': 'two', 'type': 't'},
{'group_id': 1, 'id': 2, 'text': 'three', 'type': 'a'},
{'group_id': 1, 'id': 2, 'text': 'four', 'type': 't'},
{'group_id': 1, 'id': 5, 'text': 'five', 'type': 'a'},
{'group_id': 1, 'id': 6, 'text': 'six', 'type': 't'},
{'group_id': 1, 'id': 7, 'text': 'seven', 'type': 'a'},
{'group_id': 1, 'id': 9, 'text': 'eight', 'type': 't'},
{'group_id': 1, 'id': 9, 'text': 'nine', 'type': 'a'},
{'group_id': 1, 'id': 10, 'text': 'ten', 'type': 't'},
{'group_id': 1, 'id': 11, 'text': 'eleven', 'type': 'a'}
]
)
+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| one| a|
| 1| 1| two| t|
| 1| 2| three| a|
| 1| 2| four| t|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| eight| t|
| 1| 9| nine| a|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
我们申请:
df.orderBy('id').show()
导致:
+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| one| a|
| 1| 1| two| t|
| 1| 2| three| a|
| 1| 2| four| t|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| nine| a|
| 1| 9| eight| t|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
起初,这看起来很稳定,但让我们将其应用于 DataFrame,其中包含 text="two" 的行与包含 text="three":
的行交换
df = spark.createDataFrame(
[
{'group_id': 1, 'id': 1, 'text': 'one', 'type': 'a'},
{'group_id': 1, 'id': 2, 'text': 'three', 'type': 'a'},
{'group_id': 1, 'id': 1, 'text': 'two', 'type': 't'},
{'group_id': 1, 'id': 2, 'text': 'four', 'type': 't'},
{'group_id': 1, 'id': 5, 'text': 'five', 'type': 'a'},
{'group_id': 1, 'id': 6, 'text': 'six', 'type': 't'},
{'group_id': 1, 'id': 7, 'text': 'seven', 'type': 'a'},
{'group_id': 1, 'id': 9, 'text': 'eight', 'type': 't'},
{'group_id': 1, 'id': 9, 'text': 'nine', 'type': 'a'},
{'group_id': 1, 'id': 10, 'text': 'ten', 'type': 't'},
{'group_id': 1, 'id': 11, 'text': 'eleven', 'type': 'a'}
]
)
+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| one| a|
| 1| 2| three| a|
| 1| 1| two| t|
| 1| 2| four| t|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| eight| t|
| 1| 9| nine| a|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
然后申请:
df.orderBy(df.id).show()
这导致:
+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| two| t|
| 1| 1| one| a|
| 1| 2| four| t|
| 1| 2| three| a|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| nine| a|
| 1| 9| eight| t|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
如您所见,即使行 text="one" 和 text="two" 以相同的顺序出现,.orderBy() 也会将它们调换。因此,我们可以假设 .orderBy() 不是一个稳定的排序。
我有一个看起来像
的数据框+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| one| a|
| 1| 1| two| t|
| 1| 2| three| a|
| 1| 2| four| t|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| eight| t|
| 1| 9| nine| a|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
如果我通过在 group_id 上对其进行分区并按 id 对其进行排序来执行 Window 操作,那么 orderby 是否会确保已经排序(排序)的行保持相同的顺序?
例如
window_spec = Window.partitionBy(df.group_id).orderBy(df.id)
df = df.withColumn("row_number", row_number().over(window_spec))
永远是
+--------+---+------+----+------+
|group_id| id| text|type|row_number|
+--------+---+------+----+------+
| 1| 1| one| a| 1|
| 1| 1| two| t| 2|
| 1| 2| three| a| 3|
| 1| 2| four| t| 4|
| 1| 5| five| a| 5|
| 1| 6| six| t| 6|
| 1| 7| seven| a| 7|
| 1| 9| eight| t| 8|
| 1| 9| nine| a| 9|
| 1| 10| ten| t| 10|
| 1| 11|eleven| a| 11|
+--------+---+------+----+------+
简而言之,我的问题是,spark Window 的 orderBy 如何处理已经排序(排序)的行?我的假设是它是稳定的,即它不会改变已经排序的行的顺序,但我在文档中找不到与此相关的任何内容。我怎样才能确保我的假设是正确的?
谢谢。
首先,为那些可能不知道稳定排序定义的读者建立上下文,我将引用此Whosebug answer by Joey Adams
"A sorting algorithm is said to be stable if two objects with equal keys appear in the same order in sorted output as they appear in the input array to be sorted" - Joey Adams
现在,spark 中的一个 window 函数可以被认为是 Spark 处理整个集合的 mini-DataFrame,其中每个 mini-DataFrame 都是在指定的键上创建的 - "group_id" 在这个案件。
也就是说,如果提供的数据帧有 "group_id"=2,我们最终会得到两个 Windows,其中第一个只包含 "group_id"=1 的数据,另一个"group_id"=2.
注意这一点很重要,因为我们可以测试 .orderBy() 调用对示例数据帧的影响,而不必真正担心 Window 发生了什么。强调正在发生的事情:
- 数据按指定键分区
- 然后将转换应用于在每个 window 中创建的 'mini-DataFrames'
因此,对于预先排序的输入,例如:
df = spark.createDataFrame(
[
{'group_id': 1, 'id': 1, 'text': 'one', 'type': 'a'},
{'group_id': 1, 'id': 1, 'text': 'two', 'type': 't'},
{'group_id': 1, 'id': 2, 'text': 'three', 'type': 'a'},
{'group_id': 1, 'id': 2, 'text': 'four', 'type': 't'},
{'group_id': 1, 'id': 5, 'text': 'five', 'type': 'a'},
{'group_id': 1, 'id': 6, 'text': 'six', 'type': 't'},
{'group_id': 1, 'id': 7, 'text': 'seven', 'type': 'a'},
{'group_id': 1, 'id': 9, 'text': 'eight', 'type': 't'},
{'group_id': 1, 'id': 9, 'text': 'nine', 'type': 'a'},
{'group_id': 1, 'id': 10, 'text': 'ten', 'type': 't'},
{'group_id': 1, 'id': 11, 'text': 'eleven', 'type': 'a'}
]
)
+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| one| a|
| 1| 1| two| t|
| 1| 2| three| a|
| 1| 2| four| t|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| eight| t|
| 1| 9| nine| a|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
我们申请:
df.orderBy('id').show()
导致:
+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| one| a|
| 1| 1| two| t|
| 1| 2| three| a|
| 1| 2| four| t|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| nine| a|
| 1| 9| eight| t|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
起初,这看起来很稳定,但让我们将其应用于 DataFrame,其中包含 text="two" 的行与包含 text="three":
的行交换df = spark.createDataFrame(
[
{'group_id': 1, 'id': 1, 'text': 'one', 'type': 'a'},
{'group_id': 1, 'id': 2, 'text': 'three', 'type': 'a'},
{'group_id': 1, 'id': 1, 'text': 'two', 'type': 't'},
{'group_id': 1, 'id': 2, 'text': 'four', 'type': 't'},
{'group_id': 1, 'id': 5, 'text': 'five', 'type': 'a'},
{'group_id': 1, 'id': 6, 'text': 'six', 'type': 't'},
{'group_id': 1, 'id': 7, 'text': 'seven', 'type': 'a'},
{'group_id': 1, 'id': 9, 'text': 'eight', 'type': 't'},
{'group_id': 1, 'id': 9, 'text': 'nine', 'type': 'a'},
{'group_id': 1, 'id': 10, 'text': 'ten', 'type': 't'},
{'group_id': 1, 'id': 11, 'text': 'eleven', 'type': 'a'}
]
)
+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| one| a|
| 1| 2| three| a|
| 1| 1| two| t|
| 1| 2| four| t|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| eight| t|
| 1| 9| nine| a|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
然后申请:
df.orderBy(df.id).show()
这导致:
+--------+---+------+----+
|group_id| id| text|type|
+--------+---+------+----+
| 1| 1| two| t|
| 1| 1| one| a|
| 1| 2| four| t|
| 1| 2| three| a|
| 1| 5| five| a|
| 1| 6| six| t|
| 1| 7| seven| a|
| 1| 9| nine| a|
| 1| 9| eight| t|
| 1| 10| ten| t|
| 1| 11|eleven| a|
+--------+---+------+----+
如您所见,即使行 text="one" 和 text="two" 以相同的顺序出现,.orderBy() 也会将它们调换。因此,我们可以假设 .orderBy() 不是一个稳定的排序。