将 groupByKey 替换为 Spark 中的 reduceByKey

Replace groupByKey with reduceByKey in Spark

你好,我经常需要在我的代码中使用 groupByKey,但我知道这是一个非常繁重的操作。由于我正在努力提高性能,所以我想知道我删除所有 groupByKey 调用的方法是否有效。

我习惯于从另一个 RDD 创建一个 RDD 并创建类型对 (Int, Int)

rdd1 = [(1, 2), (1, 3), (2 , 3), (2, 4), (3, 5)]

因为我需要得到这样的东西:

[(1, [2, 3]), (2 , [3, 4]), (3, [5])]

我使用的是out = rdd1.groupByKey,但由于这种方法对于庞大的数据集可能会有很大问题,所以我想使用这个解决方案:

我没有创建成对类型 (Int, Int) 的 RDD rdd1,而是创建了成对类型 (Int, List[Int]) 所以我的 rdd1 是像这样

rdd1 = [(1, [2]), (1, [3]), (2 , [3]), (2, [4]), (3, [5])]

但这次为了达到相同的结果,我使用 reduceByKey(_ ::: _) 按键连接所有值,这应该更快。您认为使用这种方法可能会提高性能吗?我担心这种类型 (Int, List[Int]) 创建一对值是只包含 1 个元素的列表是不是很愚蠢?

您认为是否有更快的方法来达到相同的结果,即使用其他方法?谢谢。

Since I'm working to improve performance I was wondering if my approach to remove all groupByKey calls is efficient.

查看 RDD.toDebugString 以了解您的 RDD 转换的逻辑计划。这应该可以让您很好地了解您的行动将有多快(或多慢)。

避免 ShuffledRDD,因为它们会引发通常非常昂贵的洗牌操作。

关于您使用 reduceByKey 的想法,请考虑 keyBy,例如

rdd.keyBy(_.kind).reduceByKey(....)

您也可以将 aggregateByKey 视为最一般的转换(位于 groupBy 和亲戚之后)。

最后但同样重要的是,groupBy 有两个允许定义分区数或 Partitioner 的变体。这些可以避免昂贵的洗牌。

继续阅读 org.apache.spark.rdd.PairRDDFunctions

使用网络 UI 可以更好地了解 "queries" 的性能。了解你的数据会有很大帮助。也要花足够的时间(否则可能会浪费优化查询的时间)。

如果你的 end 结果是

,我认为你不应该使用 reduceByKey
[(1, [2, 3]), (2 , [3, 4]), (3, [5])]

为什么?因为这就是 groupByKey 的用途,所以它可能做得最好。

groupByKey 的问题是您通常不需要具有相同键的所有值的列表(或数组),但是您可以从该列表中获得一些东西。如果你真的不需要列表,你可能可以使用 reduceByKey.

在与洗牌相同的步骤中进行减少

reduceByKey的两大优点:

  • 它可以在洗牌之前开始减少(减少同一个执行器上的值,以避免不必要的网络负载)
  • 它从不将具有相同键值的整个数组加载到内存中。这在大型数据集中很重要,其中数组可能有几 GB 大。

在您的情况下,正如您介绍的那样,第一点不是很重要(因为没有真正减少数据,只是串联),第二点不适用,因为您需要整个列表。

但是,我强烈建议您考虑一下您是否真的需要整个列表,或者这是否只是您计算中的一个步骤,尤其是在您处理大型数据集时。

回答这个问题可能有点晚了。它可能会帮助别人。

val tuples = List((1, 2), (1, 3), (2 , 3), (2, 4), (3, 5))
val context = getContext() // get Spark Context.
val tuplesRDD = context.parallelize(tuples)

val list = mutable.MutableList.empty[Int]
val addItemsToList = (s: mutable.MutableList[Int], v : Int) => s +=  v
val mergeLists = (x: mutable.MutableList[Int], 
                  y: mutable.MutableList[Int]) => x ++= y

val groupByKey = tuplesRDD.aggregateByKey(list)(addItemsToList, mergeLists)
groupByKey.cache()
groupByKey.foreach(x => println(x))

输出

(1,MutableList(2, 3))
(2,MutableList(3, 4))
(3,MutableList(5))