为什么Spark中的collect_list不使用部分聚合
Why does collect_list in Spark not use partial aggregation
我最近在玩UDAF,查看了内置聚合函数collect_list
的源代码,我惊讶地发现collect_list
没有merge
方法实现了,虽然我认为这真的很简单(只是连接两个数组)。代码取自 org.apache.spark.sql.catalyst.expressions.aggregate.collect.Collect
override def merge(buffer: InternalRow, input: InternalRow): Unit = {
sys.error("Collect cannot be used in partial aggregations.")
}
情况已不再如此,因为 SPARK-1893 但我假设最初的设计主要考虑 collect_list
。
因为 collect_list
在逻辑上等同于 groupByKey
动机将完全相同,以避免长时间的 GC 暂停。特别是 groupByKey
中的地图侧组合已被 Spark SPARK-772:
禁用
Map side combine in group by key case does not reduce the amount of data shuffled. Instead, it forces a lot more objects to go into old gen, and leads to worse GC.
所以要对你发表评论
I think this is really straight-farward (just concatenate two Arrays).
它可能很简单,但它不会增加太多价值(除非在它之上有另一个减少操作)并且序列连接很昂贵。
我最近在玩UDAF,查看了内置聚合函数collect_list
的源代码,我惊讶地发现collect_list
没有merge
方法实现了,虽然我认为这真的很简单(只是连接两个数组)。代码取自 org.apache.spark.sql.catalyst.expressions.aggregate.collect.Collect
override def merge(buffer: InternalRow, input: InternalRow): Unit = {
sys.error("Collect cannot be used in partial aggregations.")
}
情况已不再如此,因为 SPARK-1893 但我假设最初的设计主要考虑 collect_list
。
因为 collect_list
在逻辑上等同于 groupByKey
动机将完全相同,以避免长时间的 GC 暂停。特别是 groupByKey
中的地图侧组合已被 Spark SPARK-772:
Map side combine in group by key case does not reduce the amount of data shuffled. Instead, it forces a lot more objects to go into old gen, and leads to worse GC.
所以要对你发表评论
I think this is really straight-farward (just concatenate two Arrays).
它可能很简单,但它不会增加太多价值(除非在它之上有另一个减少操作)并且序列连接很昂贵。