groupByKey 是否比 reduceByKey 更受欢迎
Is groupByKey ever preferred over reduceByKey
当我需要在 RDD 中对数据进行分组时,我总是使用 reduceByKey
,因为它在洗牌数据之前执行 map side reduce,这通常意味着更少的数据被洗牌,因此我获得了更好的性能。即使map端reduce函数收集了所有的值,并没有真正减少数据量,我仍然使用reduceByKey
,因为我假设reduceByKey
的性能永远不会比[=13]差=].但是,我想知道这个假设是否正确,或者是否确实存在应该首选 groupByKey
的情况??
我不会发明轮子,根据代码文档,groupByKey
操作将 RDD 中每个键的值分组为一个序列,这也允许控制结果键的分区-通过传递 Partitioner
.
值对 RDD
此操作可能非常昂贵。如果您要分组以对每个键执行聚合(例如总和或平均值),使用 aggregateByKey
或 reduceByKey
将提供更好的性能。
注意:按照目前的实施,groupByKey
必须能够在内存中保存任何键的所有键值对。如果键值过多,可能会导致 OOME。
事实上,我更喜欢combineByKey
操作,但如果您对map-reduce范式不是很熟悉,有时很难理解combiner和merge的概念。为此,您可以阅读 yahoo map-reduce 圣经 here,它很好地解释了这个主题。
有关更多信息,我建议您阅读 PairRDDFunctions code。
reduceByKey
和 groupByKey
都使用具有不同 combine/merge 语义的 combineByKey
。
我看到的主要区别是 groupByKey
将标志 (mapSideCombine=false
) 传递给洗牌引擎。从问题 SPARK-772 来看,这是对洗牌引擎的提示,当数据大小不会改变时,不要 运行 mapside 组合器。
所以我想说的是,如果您尝试使用 reduceByKey
来复制 groupByKey
,您可能会看到轻微的性能下降。
我相信 climbage and eliasah 忽略了问题的其他方面:
- 代码可读性
- 代码可维护性
- 代码库大小
如果操作没有减少数据量,它必须是一种或另一种语义等同于 GroupByKey
的方式。假设我们有RDD[(Int,String)]
:
import scala.util.Random
Random.setSeed(1)
def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")
val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))
并且我们想要连接给定键的所有字符串。使用 groupByKey
非常简单:
rdd.groupByKey.mapValues(_.mkString(""))
reduceByKey
的天真解决方案如下所示:
rdd.reduceByKey(_ + _)
它很短而且可以说很容易理解,但有两个问题:
- 效率极低,因为它每次都会创建一个新的
String
对象*
- 表明您执行的操作比实际操作成本更低,尤其是如果您仅分析 DAG 或调试字符串
为了解决第一个问题,我们需要一个可变数据结构:
import scala.collection.mutable.StringBuilder
rdd.combineByKey[StringBuilder](
(s: String) => new StringBuilder(s),
(sb: StringBuilder, s: String) => sb ++= s,
(sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)
它仍然暗示了真正正在发生的其他事情,并且非常冗长,尤其是如果在您的脚本中重复多次。你当然可以提取匿名函数
val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) =>
sb1.append(sb2)
rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)
但归根结底,这仍然意味着需要付出额外的努力来理解这段代码,增加了复杂性并且没有真正的附加值。我发现特别麻烦的一件事是明确包含可变数据结构。即使 Spark 处理了几乎所有的复杂性,这也意味着我们不再拥有优雅的、引用透明的代码。
我的观点是,如果您真的想尽一切办法减少数据量,请使用 reduceByKey
。否则你会使你的代码更难编写,更难分析并且在 return.
中一无所获。
注:
此答案主要针对 Scala RDD
API。当前的 Python 实现与其 JVM 对应物有很大不同,并且包括优化,在类似 groupBy
的操作的情况下提供比天真的 reduceByKey
实现显着的优势。
对于 Dataset
API 参见 。
* 请参阅 Spark performance for Scala vs Python 以获得令人信服的示例
当我需要在 RDD 中对数据进行分组时,我总是使用 reduceByKey
,因为它在洗牌数据之前执行 map side reduce,这通常意味着更少的数据被洗牌,因此我获得了更好的性能。即使map端reduce函数收集了所有的值,并没有真正减少数据量,我仍然使用reduceByKey
,因为我假设reduceByKey
的性能永远不会比[=13]差=].但是,我想知道这个假设是否正确,或者是否确实存在应该首选 groupByKey
的情况??
我不会发明轮子,根据代码文档,groupByKey
操作将 RDD 中每个键的值分组为一个序列,这也允许控制结果键的分区-通过传递 Partitioner
.
此操作可能非常昂贵。如果您要分组以对每个键执行聚合(例如总和或平均值),使用 aggregateByKey
或 reduceByKey
将提供更好的性能。
注意:按照目前的实施,groupByKey
必须能够在内存中保存任何键的所有键值对。如果键值过多,可能会导致 OOME。
事实上,我更喜欢combineByKey
操作,但如果您对map-reduce范式不是很熟悉,有时很难理解combiner和merge的概念。为此,您可以阅读 yahoo map-reduce 圣经 here,它很好地解释了这个主题。
有关更多信息,我建议您阅读 PairRDDFunctions code。
reduceByKey
和 groupByKey
都使用具有不同 combine/merge 语义的 combineByKey
。
我看到的主要区别是 groupByKey
将标志 (mapSideCombine=false
) 传递给洗牌引擎。从问题 SPARK-772 来看,这是对洗牌引擎的提示,当数据大小不会改变时,不要 运行 mapside 组合器。
所以我想说的是,如果您尝试使用 reduceByKey
来复制 groupByKey
,您可能会看到轻微的性能下降。
我相信 climbage and eliasah 忽略了问题的其他方面:
- 代码可读性
- 代码可维护性
- 代码库大小
如果操作没有减少数据量,它必须是一种或另一种语义等同于 GroupByKey
的方式。假设我们有RDD[(Int,String)]
:
import scala.util.Random
Random.setSeed(1)
def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")
val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))
并且我们想要连接给定键的所有字符串。使用 groupByKey
非常简单:
rdd.groupByKey.mapValues(_.mkString(""))
reduceByKey
的天真解决方案如下所示:
rdd.reduceByKey(_ + _)
它很短而且可以说很容易理解,但有两个问题:
- 效率极低,因为它每次都会创建一个新的
String
对象* - 表明您执行的操作比实际操作成本更低,尤其是如果您仅分析 DAG 或调试字符串
为了解决第一个问题,我们需要一个可变数据结构:
import scala.collection.mutable.StringBuilder
rdd.combineByKey[StringBuilder](
(s: String) => new StringBuilder(s),
(sb: StringBuilder, s: String) => sb ++= s,
(sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)
它仍然暗示了真正正在发生的其他事情,并且非常冗长,尤其是如果在您的脚本中重复多次。你当然可以提取匿名函数
val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) =>
sb1.append(sb2)
rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)
但归根结底,这仍然意味着需要付出额外的努力来理解这段代码,增加了复杂性并且没有真正的附加值。我发现特别麻烦的一件事是明确包含可变数据结构。即使 Spark 处理了几乎所有的复杂性,这也意味着我们不再拥有优雅的、引用透明的代码。
我的观点是,如果您真的想尽一切办法减少数据量,请使用 reduceByKey
。否则你会使你的代码更难编写,更难分析并且在 return.
注:
此答案主要针对 Scala RDD
API。当前的 Python 实现与其 JVM 对应物有很大不同,并且包括优化,在类似 groupBy
的操作的情况下提供比天真的 reduceByKey
实现显着的优势。
对于 Dataset
API 参见
* 请参阅 Spark performance for Scala vs Python 以获得令人信服的示例