Apache Spark Accumulable addInPlace 需要 return 的 R1?或者有什么价值?

Apache Spark Accumulable addInPlace requires return of R1? Or any value?

From the Spark source code for Accumulable is the addInPlace method 合并来自不同分区的相同 Accumulable 的值:

/**
 * Merge two accumulated values together. Is allowed to modify and return the first value
 * for efficiency (to avoid allocating objects).
 *
 * @param r1 one set of accumulated data
 * @param r2 another set of accumulated data
 * @return both data sets merged together
 */
def addInPlace(r1: R, r2: R): R

我假设在我的 AccumulableParam 实现中定义 addInPlace 时,我可以 return 任何我想要的值。我假设我作为 r1 传入的任何指针都会指向我 return.

我的老板认为传入的 r1 是 return 语句中唯一允许的东西。这听起来像安兰德斯,谁是对的?

有一种情况我只想扔掉r1并用r2中的对象替换它,这将是这个合并累加器的新值。

我可以只 return r2 还是必须像我的老板认为的那样对 r1 进行深拷贝(Java 编程经验多得多)?需要明确的是,虽然 Spark 当然是用 Scala 编写的,但我正在编写 class 在 Java.

中实现 AccumulableParam

根据经验,在执行类似折叠的操作时,您应该永远不要修改第二个参数。我们可以用一个简单的例子来说明为什么。假设我们有这样的简单累加器:

import org.apache.spark.AccumulatorParam
import scala.collection.mutable.{Map => MMap}

type ACC = MMap[String, Int]

object DummyAccumulatorParam extends AccumulatorParam[ACC] {
  def zero(initialValue: ACC): ACC = {
    initialValue
  }

  def addInPlace(acc: ACC, v: ACC): ACC = {
    v("x") = acc.getOrElse("x", 0) +  v.getOrElse("x", 0)
    v
  }
}

特别有用,但没关系。重点是它修改了第二个参数。让我们看看它是否有效:

val rdd = sc.parallelize(Seq(MMap("x" -> 1), MMap("x" -> 1), MMap("x" -> 1)), 1)

val accum1 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum1 += x)

accum1.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)

到目前为止一切顺利。我们甚至可以创建另一个,它仍然可以按预期工作:

val accum2 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum2 += x)

accum2.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)

现在缓存数据:

rdd.cache

重复该过程:

val accum3 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum3 += x)

val accum4 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum4 += x)

并检查累加器值:

accum4.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 6)

和RDD内容:

rdd.collect
// Array[scala.collection.mutable.Map[String,Int]] = 
//  Array(Map(x -> 1), Map(x -> 3), Map(x -> 6))

因此,如您所见,return 或修改第二个参数是不安全的。它也适用于 foldaggregate 等其他类似操作。