Spark GroupBy 同时维护带有空值的模式
Spark GroupBy While Maintaining Schema With Nulls
我有一个包含多个 JSON 对象的文件,其架构如下:
{A: struct, B: struct, C: struct, D: struct}
与 属性 的 A
的值永远不会为空;但是,B, C,
或 D
中只有一个也可以是非空的。例如,我们会在数据框中看到类似这样的内容:
+----+----+----+----+
| A | B | C | D |
+----+----+----+----+
|[..]|[..]|null|null|
|[..]|null|[..]|null|
|[..]|null|null|[..]|
+----+----+----+----+
我正在尝试按 A
对数据框进行分组,同时保持 (A,B,C,D)
的相同 schema/column 结构,以便给定行中的所有值都是非空的。
A
和任何 B,C,D
之间可能存在多对一关系,在这种情况下,我会将架构更改为
{A: struct, B: list, C: list, D: list}
,但仍保留列名。
我对 Spark 和 Scala 还很陌生,只能以程序化的方式组织我的想法,我遍历每一行并在 A
上进行散列,然后以这种方式构建一个完整的行,但我不相信这是一个干净的解决方案,我也无法使用 spark API.
有效地表达它
评论部分有点笨拙,所以这里有一个例子,你可以如何做到这一点
scala> case class Foo(a:String, b:String, c:String)
defined class Foo
scala> val ds = spark.createDataset(List(Foo("1","1",null), Foo("1",null,null), Foo("1","3",null), Foo("1", null, null)))
ds: org.apache.spark.sql.Dataset[Foo] = [a: string, b: string ... 1 more field]
scala> val collected = ds.groupBy(ds("a")).agg(collect_list(ds("b")).alias("b"), collect_list(ds("c")).alias("c"))
collected: org.apache.spark.sql.DataFrame = [a: string, b: array<string> ... 1 more field]
scala> val filtered = collected.where(size(collected("b")) > 0 and size(collected("c")) > 0)
filtered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: string, b: array<string> ... 1 more field]
scala> collected.show
+---+------+---+
| a| b| c|
+---+------+---+
| 1|[1, 3]| []|
+---+------+---+
scala> filtered.show
+---+---+---+
| a| b| c|
+---+---+---+
+---+---+---+
val df = spark.createDataFrame(
sc.parallelize(
Seq(Row(1, 2, 3, 4), Row(1, 3, 4, null),
Row(2, null, 4, null), Row(2, 2, 2, null))),
StructType(Seq("A","B","C","D")
.map(StructField(_, IntegerType, true))
)
)
df.show()
+---+----+---+----+
| A| B| C| D|
+---+----+---+----+
| 1| 2| 3| 4|
| 1| 3| 4|null|
| 2|null| 4|null|
| 2| 2| 2|null|
+---+----+---+----+
df
.groupBy("A")
.agg(collect_list('B) as "B",
collect_list('C) as "C",
collect_list('D) as "D")
.show
+---+------+------+---+
| A| B| C| D|
+---+------+------+---+
| 1|[2, 3]|[3, 4]|[4]|
| 2| [2]|[4, 2]| []|
+---+------+------+---+
默认情况下,collect_list 不收集空值,这正是您想要的(如果所有值为空,您将得到一个空列表)。使用 collect_set 避免重复。
我有一个包含多个 JSON 对象的文件,其架构如下:
{A: struct, B: struct, C: struct, D: struct}
与 属性 的 A
的值永远不会为空;但是,B, C,
或 D
中只有一个也可以是非空的。例如,我们会在数据框中看到类似这样的内容:
+----+----+----+----+
| A | B | C | D |
+----+----+----+----+
|[..]|[..]|null|null|
|[..]|null|[..]|null|
|[..]|null|null|[..]|
+----+----+----+----+
我正在尝试按 A
对数据框进行分组,同时保持 (A,B,C,D)
的相同 schema/column 结构,以便给定行中的所有值都是非空的。
A
和任何 B,C,D
之间可能存在多对一关系,在这种情况下,我会将架构更改为
{A: struct, B: list, C: list, D: list}
,但仍保留列名。
我对 Spark 和 Scala 还很陌生,只能以程序化的方式组织我的想法,我遍历每一行并在 A
上进行散列,然后以这种方式构建一个完整的行,但我不相信这是一个干净的解决方案,我也无法使用 spark API.
评论部分有点笨拙,所以这里有一个例子,你可以如何做到这一点
scala> case class Foo(a:String, b:String, c:String)
defined class Foo
scala> val ds = spark.createDataset(List(Foo("1","1",null), Foo("1",null,null), Foo("1","3",null), Foo("1", null, null)))
ds: org.apache.spark.sql.Dataset[Foo] = [a: string, b: string ... 1 more field]
scala> val collected = ds.groupBy(ds("a")).agg(collect_list(ds("b")).alias("b"), collect_list(ds("c")).alias("c"))
collected: org.apache.spark.sql.DataFrame = [a: string, b: array<string> ... 1 more field]
scala> val filtered = collected.where(size(collected("b")) > 0 and size(collected("c")) > 0)
filtered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: string, b: array<string> ... 1 more field]
scala> collected.show
+---+------+---+
| a| b| c|
+---+------+---+
| 1|[1, 3]| []|
+---+------+---+
scala> filtered.show
+---+---+---+
| a| b| c|
+---+---+---+
+---+---+---+
val df = spark.createDataFrame(
sc.parallelize(
Seq(Row(1, 2, 3, 4), Row(1, 3, 4, null),
Row(2, null, 4, null), Row(2, 2, 2, null))),
StructType(Seq("A","B","C","D")
.map(StructField(_, IntegerType, true))
)
)
df.show()
+---+----+---+----+
| A| B| C| D|
+---+----+---+----+
| 1| 2| 3| 4|
| 1| 3| 4|null|
| 2|null| 4|null|
| 2| 2| 2|null|
+---+----+---+----+
df
.groupBy("A")
.agg(collect_list('B) as "B",
collect_list('C) as "C",
collect_list('D) as "D")
.show
+---+------+------+---+
| A| B| C| D|
+---+------+------+---+
| 1|[2, 3]|[3, 4]|[4]|
| 2| [2]|[4, 2]| []|
+---+------+------+---+
默认情况下,collect_list 不收集空值,这正是您想要的(如果所有值为空,您将得到一个空列表)。使用 collect_set 避免重复。