Spark RDD:多个 reducebykey 或一次

Spark RDD: multiple reducebykey or just once

我有如下代码:

// make a rd according to an id
def makeRDD(id:Int, data:RDD[(VertexId, Double)]):RDD[(Long, Double)] = { ... }  
val data:RDD[(VertexId, Double)] = ... // loading from hdfs
val idList = (1 to 100)
val rst1 = idList.map(id => makeRDD(id, data)).reduce(_ union _).reduceByKey(_+_)
val rst2 = idList.map(id => makeRDD(id, data)).reduce((l,r) => (l union r).reduceByKey(_+_))

rst1 和 rst2 得到样本结果。我认为 rst1 需要更多内存(100 倍)但只需要一个 reduceByKey 转换;然而,rst2 需要更少的内存,但需要更多的 reduceByKey 转换(99 次)。那么,这是一场时间游戏和 space 权衡吗?

我的问题是:我上面的分析是否正确,或者Spark在内部以同样的方式对待翻译动作?

P.S.: rst1 union all sub rdd 然后reduceByKey,reduceByKey是outsidereduce。 rst2 一个一个地reduceByKey,其中reduceByKey在里面 reduce.

长话短说,两种解决方案的效率都相对较低,但第二种比第一种更差。

让我们从回答最后一个问题开始。对于低级 RDD API 只有两种类型的全局自动优化(而不是):

  • 使用显式或隐式缓存的任务结果而不是重新计算完整的谱系
  • 将不需要随机播放的多个转换组合成单个 ShuffleMapStage

其他一切几乎都是定义 DAG 的顺序转换。这与更具限制性的高级 Dataset (DataFrame) API 形成对比,后者对转换做出特定假设并执行执行计划的全局优化。

关于您的代码。第一个解决方案的最大问题是当您应用迭代 union 时,血统不断增长。它使某些事情变得昂贵,例如故障恢复,并且由于 RDD 是递归定义的,因此可能会因 Whosebug 异常而失败。一个不太严重的副作用是越来越多的分区似乎没有在随后的减少中得到补偿*。你会在我对 的回答中找到更详细的解释,但你真正需要的是像这样的单个 union

sc.union(idList.map(id => makeRDD(id, data))).reduceByKey(_+_)

这实际上是一个最佳解决方案,假设您应用真正的减少功能。

第二种解决方案显然遇到了同样的问题,但情况变得更糟。虽然第一种方法只需要两个阶段和一次洗牌,但这需要对每个 RDD 进行一次洗牌。由于分区数量在增长,并且您使用默认 HashPartitioner 每条数据都必须多次写入磁盘,并且很可能通过网络多次洗牌。忽略低级计算,每条记录被洗牌 O(N) 次,其中 N 是您合并的 RDD 的数量。

关于内存使用情况,在不了解更多数据分布的情况下并不明显,但在最坏的情况下,第二种方法可以表现出更糟糕的行为。

如果+ 与常量space 一起工作,则减少的唯一要求是一个hashmap 来存储map side combine 的结果。由于分区是作为数据流处理的,没有将完整的内容读入内存,这意味着每个任务的总内存大小将与唯一键的数量而不是数据量成正比。由于第二种方法需要更多任务,因此总体内存使用量将高于第一种情况。平均而言,由于数据是部分组织的,它可能会稍微好一些,但不太可能补偿额外的成本。


* 如果您想了解它如何影响整体性能,您可以查看 这是一个略有不同的问题,但应该让您了解为什么控制分区数量很重要。