Spark SQL - 聚合集合?
Spark SQL - Aggregate collections?
假设我有 2 个数据框。
DF1 在各行的 A 列中可能有值 {3, 4, 5}。
DF2 在各行的 A 列中可能有值 {4, 5, 6}。
我可以使用 distinct_set(A) 将这些聚合到一组不同的元素中,假设所有这些行都属于同一组。
此时我在生成的数据框中有一个集合。无论如何要将该集合与另一集合聚合?基本上,如果第一次聚合产生 2 个数据帧,我希望能够聚合它们的结果。
虽然 explode 和 collect_set 可以解决这个问题,但编写自定义聚合器来合并集合本身更有意义。它们的底层结构是一个 WrappedArray。
case class SetMergeUDAF() extends UserDefinedAggregateFunction {
def deterministic: Boolean = false
def inputSchema: StructType = StructType(StructField("input", ArrayType(LongType)) :: Nil)
def bufferSchema: StructType = StructType(StructField("buffer", ArrayType(LongType)) :: Nil)
def dataType: DataType = ArrayType(LongType)
def initialize(buf: MutableAggregationBuffer): Unit = {
buf(0) = mutable.WrappedArray.empty[LongType]
}
def update(buf: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
val result : mutable.WrappedArray[LongType] = mutable.WrappedArray.empty[LongType]
val x = result ++ (buf.getAs[mutable.WrappedArray[Long]](0).toSet ++ input.getAs[mutable.WrappedArray[Long]](0).toSet).toArray[Long]
buf(0) = x
}
}
def merge(buf1: MutableAggregationBuffer, buf2: Row): Unit = {
val result : mutable.WrappedArray[LongType] = mutable.WrappedArray.empty[LongType]
val x = result ++ (buf1.getAs[mutable.WrappedArray[Long]](0).toSet ++ buf2.getAs[mutable.WrappedArray[Long]](0).toSet).toArray[Long]
buf1(0) = x
}
def evaluate(buf: Row): Any = buf.getAs[mutable.WrappedArray[LongType]](0)
}
假设我有 2 个数据框。
DF1 在各行的 A 列中可能有值 {3, 4, 5}。
DF2 在各行的 A 列中可能有值 {4, 5, 6}。
我可以使用 distinct_set(A) 将这些聚合到一组不同的元素中,假设所有这些行都属于同一组。
此时我在生成的数据框中有一个集合。无论如何要将该集合与另一集合聚合?基本上,如果第一次聚合产生 2 个数据帧,我希望能够聚合它们的结果。
虽然 explode 和 collect_set 可以解决这个问题,但编写自定义聚合器来合并集合本身更有意义。它们的底层结构是一个 WrappedArray。
case class SetMergeUDAF() extends UserDefinedAggregateFunction {
def deterministic: Boolean = false
def inputSchema: StructType = StructType(StructField("input", ArrayType(LongType)) :: Nil)
def bufferSchema: StructType = StructType(StructField("buffer", ArrayType(LongType)) :: Nil)
def dataType: DataType = ArrayType(LongType)
def initialize(buf: MutableAggregationBuffer): Unit = {
buf(0) = mutable.WrappedArray.empty[LongType]
}
def update(buf: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
val result : mutable.WrappedArray[LongType] = mutable.WrappedArray.empty[LongType]
val x = result ++ (buf.getAs[mutable.WrappedArray[Long]](0).toSet ++ input.getAs[mutable.WrappedArray[Long]](0).toSet).toArray[Long]
buf(0) = x
}
}
def merge(buf1: MutableAggregationBuffer, buf2: Row): Unit = {
val result : mutable.WrappedArray[LongType] = mutable.WrappedArray.empty[LongType]
val x = result ++ (buf1.getAs[mutable.WrappedArray[Long]](0).toSet ++ buf2.getAs[mutable.WrappedArray[Long]](0).toSet).toArray[Long]
buf1(0) = x
}
def evaluate(buf: Row): Any = buf.getAs[mutable.WrappedArray[LongType]](0)
}