pyspark collect_list 与 groupby 和 row_number 问题:每次我调用 show() 时行的顺序都会改变

pyspark collect_list with groupby and row_number issue: order of rows changes each time I call show()

下面列出的行为是预期的还是错误??

创建一个DF

data_list = [
    ['Blue', 2, 3, 1],
    ['Green', 1, 5, 4],
    ['Green', 4, 1, 3],
    ['Blue', 2, 4, 1],
    ['Green', 1, 5, 2]
]
all_cols = ['COLOR','COL1','COL2','COL3']
df = sqlContext.createDataFrame(data_list, all_cols)
df.show()
+-----+----+----+----+
|COLOR|COL1|COL2|COL3|
+-----+----+----+----+
| Blue|   2|   3|   1|
|Green|   1|   5|   4|
|Green|   4|   1|   3|
| Blue|   2|   4|   1|
|Green|   1|   5|   2|
+-----+----+----+----+

添加一个ROW_ID

df.createOrReplaceTempView('df')
df = spark.sql('select row_number() over (order by "COLOR") as ROW_ID, * from df')
df.printSchema()
root
 |-- ROW_ID: integer (nullable = true)
 |-- COLOR: string (nullable = true)
 |-- COL1: long (nullable = true)
 |-- COL2: long (nullable = true)
 |-- COL3: long (nullable = true)
df.show()
+------+-----+----+----+----+
|ROW_ID|COLOR|COL1|COL2|COL3|
+------+-----+----+----+----+
|     1|Green|   4|   1|   3|
|     2| Blue|   2|   4|   1|
|     3|Green|   1|   5|   2|
|     4| Blue|   2|   3|   1|
|     5|Green|   1|   5|   4|
+------+-----+----+----+----+

通过在第一个上应用 'groupby' 创建另一个 DF:

grp_df = df.groupby(col_grp_by).agg(collect_list('ROW_ID').alias('IDX_VAL'))
grp_df.show()
+-----+---------+
|COLOR|  IDX_VAL|
+-----+---------+
|Green|[1, 3, 5]|
| Blue|   [2, 4]|
+-----+---------+
grp_df.printSchema()
root
 |-- COLOR: string (nullable = true)
 |-- IDX_VAL: array (nullable = true)
 |    |-- element: integer (containsNull = true)

如果我再执行'grp_df.show()',请看下面

'IDX_VAL 列中的列表元素已更改!!!

grp_df.show()
+-----+---------+
|COLOR|  IDX_VAL|
+-----+---------+
|Green|[2, 3, 5]|
| Blue|   [1, 4]|
+-----+---------+

正如我在 中提到的,分组数据帧中的问题实际上是原始数据帧看似随机排序的副作用。请注意,在您的示例中,ROW_ID 不会按 COLOR 对 DataFrame 进行排序,即使那是您指定它要做的, 或者是 ?

问题是您在 order by "COLOR" 中的 "COLOR" 周围有引号。这使它成为字符串 "COLOR" 而不是列 COLOR.

考虑执行计划的差异:

spark.sql('select row_number() over (order by "COLOR") as ROW_ID, * from df').explain()
#== Physical Plan ==
#*Project [ROW_ID#350, COLOR#326, COL1#327L, COL2#328L, COL3#329L]
#+- Window [row_number() windowspecdefinition(COLOR ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS ROW_ID#350], [COLOR ASC NULLS FIRST]
#   +- *Sort [COLOR ASC NULLS FIRST], false, 0
#      +- Exchange SinglePartition
#         +- Scan ExistingRDD[COLOR#326,COL1#327L,COL2#328L,COL3#329L]

spark.sql('select row_number() over (order by COLOR) as ROW_ID, * from df').explain()
#== Physical Plan ==
#*Project [ROW_ID#342, COLOR#326, COL1#327L, COL2#328L, COL3#329L]
#+- Window [row_number() windowspecdefinition(COLOR#326 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS ROW_ID#342], [COLOR#326 ASC NULLS FIRST]
#   +- *Sort [COLOR#326 ASC NULLS FIRST], false, 0
#      +- Exchange SinglePartition
#         +- Scan ExistingRDD[COLOR#326,COL1#327L,COL2#328L,COL3#329L]

您可以看到第二个(正确的)在 Window 规范定义中有 COLOR#326 - 这表明它正在使用订单列。

因此,如果您删除引号,您应该会看到更一致的结果:

df = spark.sql('select row_number() over (order by COLOR) as ROW_ID, * from df')
df.show()
#+------+-----+----+----+----+
#|ROW_ID|COLOR|COL1|COL2|COL3|
#+------+-----+----+----+----+
#|     1| Blue|   2|   3|   1|
#|     2| Blue|   2|   4|   1|
#|     3|Green|   1|   5|   4|
#|     4|Green|   4|   1|   3|
#|     5|Green|   1|   5|   2|
#+------+-----+----+----+----+

注意DataFrame实际上是按COLOR排序的。

grp_df = df.groupby('COLOR').agg(f.collect_list('ROW_ID').alias('IDX_VAL'))
grp_df.show()
#+-----+---------+
#|COLOR|  IDX_VAL|
#+-----+---------+
#| Blue|   [1, 2]|
#|Green|[3, 4, 5]|
#+-----+---------+

但是,您可能仍会在同一个 COLOR 中得到不一致的排序,因为您没有指定在按 COLOR 排序后如何打破平局。这不会影响这种特定情况下的分组值,但建议您确保排序在所有情况下都是确定的。

对于您的数据,您可以通过以下方式获得确定性结果:

df = spark.sql(
    'select row_number() over (order by COLOR, COL1, COL2, COL3) as ROW_ID, * from df'
)