spark RDD的fold方法详解

Explanation of fold method of spark RDD

我是 运行 为 Hadoop-2.4(本地模式)预构建的 Spark-1.4.0,用于计算 DoubleRDD 的平方和。我的 Scala 代码看起来像

sc.parallelize(Array(2., 3.)).fold(0.0)((p, v) => p+v*v)

结果令人惊讶97.0

fold

的 Scala 版本相比,这是非常违反直觉的
Array(2., 3.).fold(0.0)((p, v) => p+v*v)

给出了预期的答案 13.0

由于缺乏理解,我很可能在代码中犯了一些棘手的错误。我已经阅读了 RDD.fold() 中使用的函数应该如何通信,否则结果可能取决于分区等。例如,如果我将分区数更改为 1,

sc.parallelize(Array(2., 3.), 1).fold(0.0)((p, v) => p+v*v)

代码会在我的机器上 169.0

有人能解释一下这里到底发生了什么吗?

其实 official documentation:

解释得很好

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.

为了说明正在发生的事情,让我们尝试逐步模拟正在发生的事情:

val rdd = sc.parallelize(Array(2., 3.))

val byPartition = rdd.mapPartitions(
    iter => Array(iter.fold(0.0)((p, v) => (p +  v * v))).toIterator).collect()

它给了我们类似的东西Array[Double] = Array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0)

byPartition.reduce((p, v) => (p + v * v))

returns 97

需要注意的重要一点是,结果可能不同于 运行 运行,具体取决于分区组合的顺序。