如何从任务中打印累加器变量(似乎 "work" 不调用值方法)?

How to print accumulator variable from within task (seem to "work" without calling value method)?

我知道累加器变量是 'write only' 从任务的角度来看,当它们在工作节点中执行时。我正在对此进行一些测试,我意识到我能够在任务中打印累加器值。

这里我在驱动中初始化累加器:-

scala> val accum  = sc.accumulator(123)
accum: org.apache.spark.Accumulator[Int] = 123

那我接着定义一个函数'foo':-

scala> def foo(pair:(String,String)) = { println(accum); pair }
foo: (pair: (String, String))(String, String)

在这个函数中,我只是简单地打印累加器,然后我 return 收到的是同一对。

现在我有一个名为 myrdd 的 RDD,类型如下:-

scala> myrdd
res13: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[9] at map at <console>:21

我现在在这个 RDD 上调用映射转换:-

myrdd.map(foo).collect

'collect' 操作正在应用于强制评估。所以这里实际发生的是,在此执行期间,为 RDD 的每一行打印一个零 (0)。因为这个 RDD 有 4 个元素,所以它打印 0 4 次。由于有 'collect' 动作,它最后也打印了所有元素,但这不是这里的重点。所以我有两个问题:-

  1. 从逻辑上讲,打印等同于阅读,因为只有会阅读,才能打印。那么为什么允许这样做呢?如果我们尝试 'return' 函数中的累加器,为什么没有抛出异常?
  2. 当我们在驱动程序中将它初始化为 123 时,为什么它打印 0 作为累加器的值?

经过一些实验,我发现如果我更改函数定义以访问累加器对象 (accum.value) 的实际值 属性,然后如前所述触发 RDD 操作,它确实确实抛出异常:-

scala> def foo(pair:(String,String)) = { println(accum.value); pair }

RDD求值时出现的异常:-

Can't read accumulator value in the task

所以我之前所做的是尝试打印累加器对象本身。但是问题仍然存在,为什么它打印 0?因为在驱动程序级别,如果我发出我在函数定义中使用的相同命令,我确实得到值 123:-

scala> println(accum)
123

我不需要说 println(accum.value) 就可以工作。那么,为什么只有当我在任务使用的函数中发出此命令时,它才打印 0?

Why is it printing 0 as the value of the accumulator, when we had initiated it as 123 in the driver?

因为工作节点永远不会看到初始值。唯一传递给工人的是 zero,如 AccumulatorParam 中所定义。对于 Accumulator[Int],它只是 0。如果您首先更新累加器,您将看到更新的 local 值:

val acc = sc.accumulator(123)
val rdd = sc.parallelize(List(1, 2, 3))
rdd.foreach(i => {acc += i; println(acc)})

单分区时更清晰:

rdd.repartition(1).foreach(i => {acc += i; println(acc)}

Why was the exception not thrown (...)?

因为访问时抛出异常 value method, and toString 根本没有使用它。相反,它使用私有 value_ 变量,与 value 如果 !deserialized 检查通过则返回的变量相同。