Apache Spark Accumulable addInPlace 需要 return 的 R1?或者有什么价值?
Apache Spark Accumulable addInPlace requires return of R1? Or any value?
From the Spark source code for Accumulable is the addInPlace method 合并来自不同分区的相同 Accumulable 的值:
/**
* Merge two accumulated values together. Is allowed to modify and return the first value
* for efficiency (to avoid allocating objects).
*
* @param r1 one set of accumulated data
* @param r2 another set of accumulated data
* @return both data sets merged together
*/
def addInPlace(r1: R, r2: R): R
我假设在我的 AccumulableParam 实现中定义 addInPlace 时,我可以 return 任何我想要的值。我假设我作为 r1 传入的任何指针都会指向我 return.
我的老板认为传入的 r1 是 return 语句中唯一允许的东西。这听起来像安兰德斯,谁是对的?
有一种情况我只想扔掉r1并用r2中的对象替换它,这将是这个合并累加器的新值。
我可以只 return r2 还是必须像我的老板认为的那样对 r1 进行深拷贝(Java 编程经验多得多)?需要明确的是,虽然 Spark 当然是用 Scala 编写的,但我正在编写 class 在 Java.
中实现 AccumulableParam
根据经验,在执行类似折叠的操作时,您应该永远不要修改第二个参数。我们可以用一个简单的例子来说明为什么。假设我们有这样的简单累加器:
import org.apache.spark.AccumulatorParam
import scala.collection.mutable.{Map => MMap}
type ACC = MMap[String, Int]
object DummyAccumulatorParam extends AccumulatorParam[ACC] {
def zero(initialValue: ACC): ACC = {
initialValue
}
def addInPlace(acc: ACC, v: ACC): ACC = {
v("x") = acc.getOrElse("x", 0) + v.getOrElse("x", 0)
v
}
}
特别有用,但没关系。重点是它修改了第二个参数。让我们看看它是否有效:
val rdd = sc.parallelize(Seq(MMap("x" -> 1), MMap("x" -> 1), MMap("x" -> 1)), 1)
val accum1 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum1 += x)
accum1.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)
到目前为止一切顺利。我们甚至可以创建另一个,它仍然可以按预期工作:
val accum2 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum2 += x)
accum2.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)
现在缓存数据:
rdd.cache
重复该过程:
val accum3 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum3 += x)
val accum4 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum4 += x)
并检查累加器值:
accum4.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 6)
和RDD内容:
rdd.collect
// Array[scala.collection.mutable.Map[String,Int]] =
// Array(Map(x -> 1), Map(x -> 3), Map(x -> 6))
因此,如您所见,return 或修改第二个参数是不安全的。它也适用于 fold
或 aggregate
等其他类似操作。
From the Spark source code for Accumulable is the addInPlace method 合并来自不同分区的相同 Accumulable 的值:
/**
* Merge two accumulated values together. Is allowed to modify and return the first value
* for efficiency (to avoid allocating objects).
*
* @param r1 one set of accumulated data
* @param r2 another set of accumulated data
* @return both data sets merged together
*/
def addInPlace(r1: R, r2: R): R
我假设在我的 AccumulableParam 实现中定义 addInPlace 时,我可以 return 任何我想要的值。我假设我作为 r1 传入的任何指针都会指向我 return.
我的老板认为传入的 r1 是 return 语句中唯一允许的东西。这听起来像安兰德斯,谁是对的?
有一种情况我只想扔掉r1并用r2中的对象替换它,这将是这个合并累加器的新值。
我可以只 return r2 还是必须像我的老板认为的那样对 r1 进行深拷贝(Java 编程经验多得多)?需要明确的是,虽然 Spark 当然是用 Scala 编写的,但我正在编写 class 在 Java.
中实现 AccumulableParam根据经验,在执行类似折叠的操作时,您应该永远不要修改第二个参数。我们可以用一个简单的例子来说明为什么。假设我们有这样的简单累加器:
import org.apache.spark.AccumulatorParam
import scala.collection.mutable.{Map => MMap}
type ACC = MMap[String, Int]
object DummyAccumulatorParam extends AccumulatorParam[ACC] {
def zero(initialValue: ACC): ACC = {
initialValue
}
def addInPlace(acc: ACC, v: ACC): ACC = {
v("x") = acc.getOrElse("x", 0) + v.getOrElse("x", 0)
v
}
}
特别有用,但没关系。重点是它修改了第二个参数。让我们看看它是否有效:
val rdd = sc.parallelize(Seq(MMap("x" -> 1), MMap("x" -> 1), MMap("x" -> 1)), 1)
val accum1 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum1 += x)
accum1.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)
到目前为止一切顺利。我们甚至可以创建另一个,它仍然可以按预期工作:
val accum2 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum2 += x)
accum2.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)
现在缓存数据:
rdd.cache
重复该过程:
val accum3 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum3 += x)
val accum4 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum4 += x)
并检查累加器值:
accum4.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 6)
和RDD内容:
rdd.collect
// Array[scala.collection.mutable.Map[String,Int]] =
// Array(Map(x -> 1), Map(x -> 3), Map(x -> 6))
因此,如您所见,return 或修改第二个参数是不安全的。它也适用于 fold
或 aggregate
等其他类似操作。