将 groupByKey 替换为 Spark 中的 reduceByKey
Replace groupByKey with reduceByKey in Spark
你好,我经常需要在我的代码中使用 groupByKey
,但我知道这是一个非常繁重的操作。由于我正在努力提高性能,所以我想知道我删除所有 groupByKey
调用的方法是否有效。
我习惯于从另一个 RDD 创建一个 RDD 并创建类型对 (Int, Int)
rdd1 = [(1, 2), (1, 3), (2 , 3), (2, 4), (3, 5)]
因为我需要得到这样的东西:
[(1, [2, 3]), (2 , [3, 4]), (3, [5])]
我使用的是out = rdd1.groupByKey
,但由于这种方法对于庞大的数据集可能会有很大问题,所以我想使用这个解决方案:
我没有创建成对类型 (Int, Int) 的 RDD rdd1
,而是创建了成对类型 (Int, List[Int]) 所以我的 rdd1
是像这样
rdd1 = [(1, [2]), (1, [3]), (2 , [3]), (2, [4]), (3, [5])]
但这次为了达到相同的结果,我使用 reduceByKey(_ ::: _)
按键连接所有值,这应该更快。您认为使用这种方法可能会提高性能吗?我担心这种类型 (Int, List[Int]) 创建一对值是只包含 1 个元素的列表是不是很愚蠢?
您认为是否有更快的方法来达到相同的结果,即使用其他方法?谢谢。
Since I'm working to improve performance I was wondering if my approach to remove all groupByKey calls is efficient.
查看 RDD.toDebugString
以了解您的 RDD 转换的逻辑计划。这应该可以让您很好地了解您的行动将有多快(或多慢)。
避免 ShuffledRDD
,因为它们会引发通常非常昂贵的洗牌操作。
关于您使用 reduceByKey
的想法,请考虑 keyBy
,例如
rdd.keyBy(_.kind).reduceByKey(....)
您也可以将 aggregateByKey
视为最一般的转换(位于 groupBy
和亲戚之后)。
最后但同样重要的是,groupBy
有两个允许定义分区数或 Partitioner
的变体。这些可以避免昂贵的洗牌。
继续阅读 org.apache.spark.rdd.PairRDDFunctions。
使用网络 UI 可以更好地了解 "queries" 的性能。了解你的数据会有很大帮助。也要花足够的时间(否则可能会浪费优化查询的时间)。
如果你的 end 结果是
,我认为你不应该使用 reduceByKey
[(1, [2, 3]), (2 , [3, 4]), (3, [5])]
为什么?因为这就是 groupByKey
的用途,所以它可能做得最好。
groupByKey
的问题是您通常不需要具有相同键的所有值的列表(或数组),但是您可以从该列表中获得一些东西。如果你真的不需要列表,你可能可以使用 reduceByKey
.
在与洗牌相同的步骤中进行减少
reduceByKey
的两大优点:
- 它可以在洗牌之前开始减少(减少同一个执行器上的值,以避免不必要的网络负载)
- 它从不将具有相同键值的整个数组加载到内存中。这在大型数据集中很重要,其中数组可能有几 GB 大。
在您的情况下,正如您介绍的那样,第一点不是很重要(因为没有真正减少数据,只是串联),第二点不适用,因为您需要整个列表。
但是,我强烈建议您考虑一下您是否真的需要整个列表,或者这是否只是您计算中的一个步骤,尤其是在您处理大型数据集时。
回答这个问题可能有点晚了。它可能会帮助别人。
val tuples = List((1, 2), (1, 3), (2 , 3), (2, 4), (3, 5))
val context = getContext() // get Spark Context.
val tuplesRDD = context.parallelize(tuples)
val list = mutable.MutableList.empty[Int]
val addItemsToList = (s: mutable.MutableList[Int], v : Int) => s += v
val mergeLists = (x: mutable.MutableList[Int],
y: mutable.MutableList[Int]) => x ++= y
val groupByKey = tuplesRDD.aggregateByKey(list)(addItemsToList, mergeLists)
groupByKey.cache()
groupByKey.foreach(x => println(x))
输出
(1,MutableList(2, 3))
(2,MutableList(3, 4))
(3,MutableList(5))
你好,我经常需要在我的代码中使用 groupByKey
,但我知道这是一个非常繁重的操作。由于我正在努力提高性能,所以我想知道我删除所有 groupByKey
调用的方法是否有效。
我习惯于从另一个 RDD 创建一个 RDD 并创建类型对 (Int, Int)
rdd1 = [(1, 2), (1, 3), (2 , 3), (2, 4), (3, 5)]
因为我需要得到这样的东西:
[(1, [2, 3]), (2 , [3, 4]), (3, [5])]
我使用的是out = rdd1.groupByKey
,但由于这种方法对于庞大的数据集可能会有很大问题,所以我想使用这个解决方案:
我没有创建成对类型 (Int, Int) 的 RDD rdd1
,而是创建了成对类型 (Int, List[Int]) 所以我的 rdd1
是像这样
rdd1 = [(1, [2]), (1, [3]), (2 , [3]), (2, [4]), (3, [5])]
但这次为了达到相同的结果,我使用 reduceByKey(_ ::: _)
按键连接所有值,这应该更快。您认为使用这种方法可能会提高性能吗?我担心这种类型 (Int, List[Int]) 创建一对值是只包含 1 个元素的列表是不是很愚蠢?
您认为是否有更快的方法来达到相同的结果,即使用其他方法?谢谢。
Since I'm working to improve performance I was wondering if my approach to remove all groupByKey calls is efficient.
查看 RDD.toDebugString
以了解您的 RDD 转换的逻辑计划。这应该可以让您很好地了解您的行动将有多快(或多慢)。
避免 ShuffledRDD
,因为它们会引发通常非常昂贵的洗牌操作。
关于您使用 reduceByKey
的想法,请考虑 keyBy
,例如
rdd.keyBy(_.kind).reduceByKey(....)
您也可以将 aggregateByKey
视为最一般的转换(位于 groupBy
和亲戚之后)。
最后但同样重要的是,groupBy
有两个允许定义分区数或 Partitioner
的变体。这些可以避免昂贵的洗牌。
继续阅读 org.apache.spark.rdd.PairRDDFunctions。
使用网络 UI 可以更好地了解 "queries" 的性能。了解你的数据会有很大帮助。也要花足够的时间(否则可能会浪费优化查询的时间)。
如果你的 end 结果是
,我认为你不应该使用reduceByKey
[(1, [2, 3]), (2 , [3, 4]), (3, [5])]
为什么?因为这就是 groupByKey
的用途,所以它可能做得最好。
groupByKey
的问题是您通常不需要具有相同键的所有值的列表(或数组),但是您可以从该列表中获得一些东西。如果你真的不需要列表,你可能可以使用 reduceByKey
.
reduceByKey
的两大优点:
- 它可以在洗牌之前开始减少(减少同一个执行器上的值,以避免不必要的网络负载)
- 它从不将具有相同键值的整个数组加载到内存中。这在大型数据集中很重要,其中数组可能有几 GB 大。
在您的情况下,正如您介绍的那样,第一点不是很重要(因为没有真正减少数据,只是串联),第二点不适用,因为您需要整个列表。
但是,我强烈建议您考虑一下您是否真的需要整个列表,或者这是否只是您计算中的一个步骤,尤其是在您处理大型数据集时。
回答这个问题可能有点晚了。它可能会帮助别人。
val tuples = List((1, 2), (1, 3), (2 , 3), (2, 4), (3, 5))
val context = getContext() // get Spark Context.
val tuplesRDD = context.parallelize(tuples)
val list = mutable.MutableList.empty[Int]
val addItemsToList = (s: mutable.MutableList[Int], v : Int) => s += v
val mergeLists = (x: mutable.MutableList[Int],
y: mutable.MutableList[Int]) => x ++= y
val groupByKey = tuplesRDD.aggregateByKey(list)(addItemsToList, mergeLists)
groupByKey.cache()
groupByKey.foreach(x => println(x))
输出
(1,MutableList(2, 3))
(2,MutableList(3, 4))
(3,MutableList(5))