折叠动作在 Spark 中是如何工作的?
How does the fold action work in Spark?
下面我有一个 Spark fold
动作的 Scala 示例:
val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
rdd1.fold(5)(_ + _)
这会产生输出 35
。有人可以详细解释一下这个输出是如何计算出来的吗?
zeroValue
为每个分区添加一次并且应该是一个中性元素 - 在 +
的情况下它应该是 0。确切的结果将取决于分区的数量但它等同于:
rdd1.mapPartitions(iter => Iterator(iter.foldLeft(zeroValue)(_ + _))).reduce(_ + _)
所以:
val rdd1 = sc.parallelize(List(1,2,3,4,5),3)
将数据分发为:
scala> rdd1.glom.collect
res1: Array[Array[Int]] = Array(Array(1), Array(2, 3), Array(4, 5))
整个表达式等同于:
(5 + 1) + (5 + 2 + 3) + (5 + 4 + 5)
摘自 Scaladocs here(强调我的):
@param zeroValue the initial value for the accumulated result of each
partition for the op
operator, and also the initial value for the
combine results from different
partitions for the op
operator - this will typically be the neutral
element (e.g. Nil
for list concatenation or 0
for summation)
zeroValue
在您的情况下添加了四次(每个分区一次,合并分区结果时加一次)。所以结果是:
(5 + 1) + (5 + 2 + 3) + (5 + 4 + 5) + 5 // (extra one for combining results)
你知道 Spark RDD 执行分布式计算。
所以,这里的这一行,
val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
告诉 Spark 它需要在这个 RDD 中支持 3 个分区,这将使它能够 运行 使用 3 个独立的并行执行器进行计算。
现在,这里的这一行,
rdd1.fold(5)(_ + _)
告诉 spark 使用 5 作为初始值折叠所有这些分区,然后使用 5 作为初始值再次折叠来自 3 个执行程序的所有这些分区结果。
一个普通的 Scala 等价物可以写成,
val list = List(1, 2, 3, 4, 5)
val listOfList = list.grouped(2).toList
val listOfFolds = listOfList.map(l => l.fold(5)(_ + _))
val fold = listOfFolds.fold(5)(_ + _)
所以...如果您在 RDD 上使用 fold
,您需要提供 zero value
.
但是你会问 - 为什么或什么时候有人会使用 fold
而不是 reduce
?
你的困惑在于你对zero value
的看法。问题是 RDD[T] 的 zero value
并不完全取决于我们的类型 T
,还取决于计算的性质。所以你的 zero value
不需要是 0
.
让我们考虑一个简单的例子,我们想在我们的 RDD 中计算 "largest number greater than 15" or "15"
,
我们可以使用 reduce
做到这一点吗?答案是不。但是我们可以使用 fold
.
val n15GT15 = rdd1.fold(15)({ case (acc, i) => Math.max(acc, i) })
下面我有一个 Spark fold
动作的 Scala 示例:
val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
rdd1.fold(5)(_ + _)
这会产生输出 35
。有人可以详细解释一下这个输出是如何计算出来的吗?
zeroValue
为每个分区添加一次并且应该是一个中性元素 - 在 +
的情况下它应该是 0。确切的结果将取决于分区的数量但它等同于:
rdd1.mapPartitions(iter => Iterator(iter.foldLeft(zeroValue)(_ + _))).reduce(_ + _)
所以:
val rdd1 = sc.parallelize(List(1,2,3,4,5),3)
将数据分发为:
scala> rdd1.glom.collect
res1: Array[Array[Int]] = Array(Array(1), Array(2, 3), Array(4, 5))
整个表达式等同于:
(5 + 1) + (5 + 2 + 3) + (5 + 4 + 5)
摘自 Scaladocs here(强调我的):
@param zeroValue the initial value for the accumulated result of each partition for the
op
operator, and also the initial value for the combine results from different partitions for theop
operator - this will typically be the neutral element (e.g.Nil
for list concatenation or0
for summation)
zeroValue
在您的情况下添加了四次(每个分区一次,合并分区结果时加一次)。所以结果是:
(5 + 1) + (5 + 2 + 3) + (5 + 4 + 5) + 5 // (extra one for combining results)
你知道 Spark RDD 执行分布式计算。
所以,这里的这一行,
val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
告诉 Spark 它需要在这个 RDD 中支持 3 个分区,这将使它能够 运行 使用 3 个独立的并行执行器进行计算。
现在,这里的这一行,
rdd1.fold(5)(_ + _)
告诉 spark 使用 5 作为初始值折叠所有这些分区,然后使用 5 作为初始值再次折叠来自 3 个执行程序的所有这些分区结果。
一个普通的 Scala 等价物可以写成,
val list = List(1, 2, 3, 4, 5)
val listOfList = list.grouped(2).toList
val listOfFolds = listOfList.map(l => l.fold(5)(_ + _))
val fold = listOfFolds.fold(5)(_ + _)
所以...如果您在 RDD 上使用 fold
,您需要提供 zero value
.
但是你会问 - 为什么或什么时候有人会使用 fold
而不是 reduce
?
你的困惑在于你对zero value
的看法。问题是 RDD[T] 的 zero value
并不完全取决于我们的类型 T
,还取决于计算的性质。所以你的 zero value
不需要是 0
.
让我们考虑一个简单的例子,我们想在我们的 RDD 中计算 "largest number greater than 15" or "15"
,
我们可以使用 reduce
做到这一点吗?答案是不。但是我们可以使用 fold
.
val n15GT15 = rdd1.fold(15)({ case (acc, i) => Math.max(acc, i) })