RDD 拆分并在新的 RDD 上进行聚合

RDD split and do aggregation on new RDDs

我有一个 (String,String,Int) 的 RDD。

  1. 我想根据前两个字符串进行缩减
  2. 然后根据第一个字符串,我想对 (String,Int) 进行分组并对它们进行排序
  3. 排序后我需要将它们分成小组,每个小组包含 n 个元素。

我已经完成了下面的代码。问题是步骤 2 中的元素数量对于单个键来说非常大 reduceByKey(x++y) 需要很多时间。

//Input
val data = Array(
  ("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1), 
  ("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))

val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))

// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y) 

// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList)) 

问题是 "c1" 有很多独特的条目,例如 b1 ,b2....million 并且 reduceByKey 正在消磨时间,因为所有值都将转到单个节点。 有没有办法更有效地实现这一目标?

// output
 Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))

数据分组的方式至少没有什么问题。第一个问题由

介绍
 mapValues(x => ArrayBuffer(x))

它创建了大量不提供额外价值的可变对象,因为您无法在随后的 reduceByKey

中利用它们的可变性
reduceByKey((x, y) => x ++ y) 

其中每个 ++ 创建一个新集合,并且两个参数都不能安全地改变。由于 reduceByKey 应用了 map 端聚合,情况更糟,几乎造成了 GC 地狱。

Is there a way to achieve this more efficiently?

除非您对数据分布有更深入的了解,可用于定义更智能的分区器,否则最简单的改进是将 mapValues + reduceByKey 替换为简单的 groupByKey:

val r3 = r2.groupByKey

也应该可以为 reduceByKey 调用和 mapPartitions 使用自定义分区程序,使用 preservesPartitioning 而不是 map

class FirsElementPartitioner(partitions: Int)
    extends org.apache.spark.Partitioner {
  def numPartitions  = partitions
  def getPartition(key: Any): Int = {
    key.asInstanceOf[(Any, Any)]._1.## % numPartitions
  }
}

val r2 = r1
  .reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y)
  .mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true)

// No shuffle required here.
val r3 = r2.groupByKey

它只需要一次洗牌,groupByKey 只是一个本地操作:

r3.toDebugString
// (8) MapPartitionsRDD[41] at groupByKey at <console>:37 []
//  |  MapPartitionsRDD[40] at mapPartitions at <console>:35 []
//  |  ShuffledRDD[39] at reduceByKey at <console>:34 []
//  +-(8) MapPartitionsRDD[1] at map at <console>:28 []
//     |  ParallelCollectionRDD[0] at parallelize at <console>:26 []