Spark GroupBy 同时维护带有空值的模式

Spark GroupBy While Maintaining Schema With Nulls

我有一个包含多个 JSON 对象的文件,其架构如下:

{A: struct, B: struct, C: struct, D: struct}

与 属性 的 A 的值永远不会为空;但是,B, C,D 中只有一个也可以是非空的。例如,我们会在数据框中看到类似这样的内容:

+----+----+----+----+
| A  | B  | C  | D  |
+----+----+----+----+
|[..]|[..]|null|null|
|[..]|null|[..]|null|
|[..]|null|null|[..]|
+----+----+----+----+

我正在尝试按 A 对数据框进行分组,同时保持 (A,B,C,D) 的相同 schema/column 结构,以便给定行中的所有值都是非空的。

A 和任何 B,C,D 之间可能存在多对一关系,在这种情况下,我会将架构更改为

{A: struct, B: list, C: list, D: list},但仍保留列名。

我对 Spark 和 Scala 还很陌生,只能以程序化的方式组织我的想法,我遍历每一行并在 A 上进行散列,然后以这种方式构建一个完整的行,但我不相信这是一个干净的解决方案,我也无法使用 spark API.

有效地表达它

评论部分有点笨拙,所以这里有一个例子,你可以如何做到这一点

scala> case class Foo(a:String, b:String, c:String)
defined class Foo

scala> val ds = spark.createDataset(List(Foo("1","1",null), Foo("1",null,null), Foo("1","3",null), Foo("1", null, null)))
ds: org.apache.spark.sql.Dataset[Foo] = [a: string, b: string ... 1 more field]

scala> val collected = ds.groupBy(ds("a")).agg(collect_list(ds("b")).alias("b"), collect_list(ds("c")).alias("c"))
collected: org.apache.spark.sql.DataFrame = [a: string, b: array<string> ... 1 more field]

scala> val filtered = collected.where(size(collected("b")) > 0 and size(collected("c")) > 0)
filtered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: string, b: array<string> ... 1 more field]

scala> collected.show
+---+------+---+
|  a|     b|  c|
+---+------+---+
|  1|[1, 3]| []|
+---+------+---+


scala> filtered.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
+---+---+---+
val df = spark.createDataFrame(
     sc.parallelize(
         Seq(Row(1, 2, 3, 4), Row(1, 3, 4, null),
             Row(2, null, 4, null), Row(2, 2, 2, null))),
         StructType(Seq("A","B","C","D")
                        .map(StructField(_, IntegerType, true))
     )
)

df.show()
+---+----+---+----+
|  A|   B|  C|   D|
+---+----+---+----+
|  1|   2|  3|   4|
|  1|   3|  4|null|
|  2|null|  4|null|
|  2|   2|  2|null|
+---+----+---+----+

df
    .groupBy("A")
    .agg(collect_list('B) as "B", 
         collect_list('C) as "C",
         collect_list('D) as "D")
    .show

+---+------+------+---+
|  A|     B|     C|  D|
+---+------+------+---+
|  1|[2, 3]|[3, 4]|[4]|
|  2|   [2]|[4, 2]| []|
+---+------+------+---+

默认情况下,collect_list 不收集空值,这正是您想要的(如果所有值为空,您将得到一个空列表)。使用 collect_set 避免重复。