RDD 拆分并在新的 RDD 上进行聚合
RDD split and do aggregation on new RDDs
我有一个 (String,String,Int)
的 RDD。
- 我想根据前两个字符串进行缩减
- 然后根据第一个字符串,我想对 (String,Int) 进行分组并对它们进行排序
- 排序后我需要将它们分成小组,每个小组包含 n 个元素。
我已经完成了下面的代码。问题是步骤 2 中的元素数量对于单个键来说非常大
reduceByKey(x++y)
需要很多时间。
//Input
val data = Array(
("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1),
("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))
val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))
// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y)
// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))
问题是 "c1" 有很多独特的条目,例如 b1 ,b2....million 并且 reduceByKey
正在消磨时间,因为所有值都将转到单个节点。
有没有办法更有效地实现这一目标?
// output
Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))
数据分组的方式至少没有什么问题。第一个问题由
介绍
mapValues(x => ArrayBuffer(x))
它创建了大量不提供额外价值的可变对象,因为您无法在随后的 reduceByKey
中利用它们的可变性
reduceByKey((x, y) => x ++ y)
其中每个 ++
创建一个新集合,并且两个参数都不能安全地改变。由于 reduceByKey
应用了 map 端聚合,情况更糟,几乎造成了 GC 地狱。
Is there a way to achieve this more efficiently?
除非您对数据分布有更深入的了解,可用于定义更智能的分区器,否则最简单的改进是将 mapValues
+ reduceByKey
替换为简单的 groupByKey
:
val r3 = r2.groupByKey
也应该可以为 reduceByKey
调用和 mapPartitions
使用自定义分区程序,使用 preservesPartitioning
而不是 map
。
class FirsElementPartitioner(partitions: Int)
extends org.apache.spark.Partitioner {
def numPartitions = partitions
def getPartition(key: Any): Int = {
key.asInstanceOf[(Any, Any)]._1.## % numPartitions
}
}
val r2 = r1
.reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y)
.mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true)
// No shuffle required here.
val r3 = r2.groupByKey
它只需要一次洗牌,groupByKey
只是一个本地操作:
r3.toDebugString
// (8) MapPartitionsRDD[41] at groupByKey at <console>:37 []
// | MapPartitionsRDD[40] at mapPartitions at <console>:35 []
// | ShuffledRDD[39] at reduceByKey at <console>:34 []
// +-(8) MapPartitionsRDD[1] at map at <console>:28 []
// | ParallelCollectionRDD[0] at parallelize at <console>:26 []
我有一个 (String,String,Int)
的 RDD。
- 我想根据前两个字符串进行缩减
- 然后根据第一个字符串,我想对 (String,Int) 进行分组并对它们进行排序
- 排序后我需要将它们分成小组,每个小组包含 n 个元素。
我已经完成了下面的代码。问题是步骤 2 中的元素数量对于单个键来说非常大
reduceByKey(x++y)
需要很多时间。
//Input
val data = Array(
("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1),
("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))
val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))
// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y)
// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))
问题是 "c1" 有很多独特的条目,例如 b1 ,b2....million 并且 reduceByKey
正在消磨时间,因为所有值都将转到单个节点。
有没有办法更有效地实现这一目标?
// output
Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))
数据分组的方式至少没有什么问题。第一个问题由
介绍 mapValues(x => ArrayBuffer(x))
它创建了大量不提供额外价值的可变对象,因为您无法在随后的 reduceByKey
reduceByKey((x, y) => x ++ y)
其中每个 ++
创建一个新集合,并且两个参数都不能安全地改变。由于 reduceByKey
应用了 map 端聚合,情况更糟,几乎造成了 GC 地狱。
Is there a way to achieve this more efficiently?
除非您对数据分布有更深入的了解,可用于定义更智能的分区器,否则最简单的改进是将 mapValues
+ reduceByKey
替换为简单的 groupByKey
:
val r3 = r2.groupByKey
也应该可以为 reduceByKey
调用和 mapPartitions
使用自定义分区程序,使用 preservesPartitioning
而不是 map
。
class FirsElementPartitioner(partitions: Int)
extends org.apache.spark.Partitioner {
def numPartitions = partitions
def getPartition(key: Any): Int = {
key.asInstanceOf[(Any, Any)]._1.## % numPartitions
}
}
val r2 = r1
.reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y)
.mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true)
// No shuffle required here.
val r3 = r2.groupByKey
它只需要一次洗牌,groupByKey
只是一个本地操作:
r3.toDebugString
// (8) MapPartitionsRDD[41] at groupByKey at <console>:37 []
// | MapPartitionsRDD[40] at mapPartitions at <console>:35 []
// | ShuffledRDD[39] at reduceByKey at <console>:34 []
// +-(8) MapPartitionsRDD[1] at map at <console>:28 []
// | ParallelCollectionRDD[0] at parallelize at <console>:26 []