在 Pyspark 中展平分组

Flatten Group By in Pyspark

我有一个 pyspark 数据框。例如,

d= hiveContext.createDataFrame([("A", 1), ("B", 2), ("D", 3), ("D", 3),  ("A", 4), ("D", 3)],["Col1", "Col2"])

+----+----+
|Col1|Col2|
+----+----+
|   A|   1|
|   B|   2|
|   D|   3|
|   D|   3|
|   A|   4|
|   D|   3|
+----+----+

我想按 Col1 分组,然后创建 Col2 的列表。我需要展平这些团体。我确实有很多专栏。

+----+----------+
|Col1|      Col2|
+----+----------+
|   A|   [1,4]  |
|   B|   [2]    |
|   D|   [3,3,3]|
+----+----------+

您可以执行 groupBy() 并使用 collect_list() 作为聚合函数:

import pyspark.sql.functions as f
d.groupBy('Col1').agg(f.collect_list('Col2').alias('Col2')).show()
#+----+---------+
#|Col1|     Col2|
#+----+---------+
#|   B|      [2]|
#|   D|[3, 3, 3]|
#|   A|   [1, 4]|
#+----+---------+

更新

如果要合并多个列,可以对每个列使用 collect_list(),然后使用 struct()udf() 合并结果列表。考虑以下示例:

创建虚拟数据

from operator import add
import pyspark.sql.functions as f

# create example dataframe
d = sqlcx.createDataFrame(
    [
        ("A", 1, 10),
        ("B", 2, 20),
        ("D", 3, 30),
        ("D", 3, 10),
        ("A", 4, 20),
        ("D", 3, 30)
    ],
    ["Col1", "Col2", "Col3"]
)

将所需的列收集到列表中

假设您有一个要收集到列表中的列的列表。您可以执行以下操作:

cols_to_combine = ['Col2', 'Col3']
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine]).show()
#+----+---------+------------+
#|Col1|     Col2|        Col3|
#+----+---------+------------+
#|   B|      [2]|        [20]|
#|   D|[3, 3, 3]|[30, 10, 30]|
#|   A|   [4, 1]|    [20, 10]|
#+----+---------+------------+

将结果列表合并为一列

现在我们要将列表列合并为一个列表。如果我们使用 struct(),我们将得到以下内容:

d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
    .select('Col1', f.struct(*cols_to_combine).alias('Combined'))\
    .show(truncate=False)
#+----+------------------------------------------------+
#|Col1|Combined                                        |
#+----+------------------------------------------------+
#|B   |[WrappedArray(2),WrappedArray(20)]              |
#|D   |[WrappedArray(3, 3, 3),WrappedArray(10, 30, 30)]|
#|A   |[WrappedArray(1, 4),WrappedArray(10, 20)]       |
#+----+------------------------------------------------+

展平环绕数组

快到了。我们只需要组合 WrappedArrays。我们可以通过 udf():

来实现
combine_wrapped_arrays = f.udf(lambda val: reduce(add, val), ArrayType(IntegerType()))
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
    .select('Col1', combine_wrapped_arrays(f.struct(*cols_to_combine)).alias('Combined'))\
    .show(truncate=False)
#+----+---------------------+
#|Col1|Combined             |
#+----+---------------------+
#|B   |[2, 20]              |
#|D   |[3, 3, 3, 30, 10, 30]|
#|A   |[1, 4, 10, 20]       |
#+----+---------------------+

参考资料

  • Pyspark Merge WrappedArrays Within a Dataframe

更新 2

更简单的方法,无需处理WrappedArrays:

from operator import add

combine_udf = lambda cols: f.udf(
    lambda *args: reduce(add, args),
    ArrayType(IntegerType())
)

d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
    .select('Col1', combine_udf(cols_to_combine)(*cols_to_combine).alias('Combined'))\
    .show(truncate=False)
#+----+---------------------+
#|Col1|Combined             |
#+----+---------------------+
#|B   |[2, 20]              |
#|D   |[3, 3, 3, 30, 10, 30]|
#|A   |[1, 4, 10, 20]       |
#+----+---------------------+

注意:这最后一步仅在所有列的数据类型都相同时才有效。您不能使用此函数将包装数组与混合类型组合起来。

从 spark 2.4 开始你可以使用 pyspark.sql.functions.flatten

import pyspark.sql.functions as f
df.groupBy('Col1').agg(f.flatten(f.collect_list('Col2')).alias('Col2')).show()