累加器作为火花流中的计数器

accumulator as counter in spark streaming

我正在使用 spark streaming,我想制作一个累加器来计算到目前为止 dstream 的实例数。我想在 train 方法中增加计数器,然后我想在另一个名为 assign 的方法中使用这个计数器。下面发布的代码不起作用。我在 sparks UI 中找不到命名的累加器,当我在方法外打印它的值时,它始终为零。如果您认为有更合适的方法(我的意思是不使用累加器),请向我解释如何做。

var numInstances: LongAccumulator = null

def init(ssc:StreamingContext) : Unit = {
numInstances = ssc.sparkContext.longAccumulator("numInst")
}

def train(input: DStream[Example]): Unit = {
input.foreachRDD(rdd => {
  rdd.foreach(ex => {
    manager = manager.update(ex)
    numInstances.add(1) 
   })
})
}

def assign: Array[Example] = {
 if(numInstances.value <= sizeOption.getValue) {
  //do something
 }
 else {
  //do something else
 }
}

由于传给foreachRDD的函数是在驱动上执行的,所以可以这样重写train

var totalCount = 0L
def train(input: DStream[Example]): Unit = {
  input.foreachRDD { rdd =>
   totalCount += rdd.count() 
   rdd.foreach { ex =>
     manager = manager.update(ex)
   }
  }
}

注意 totalCount 值 - Spark 不会分发更改,并且仅在驱动程序上有效,因此您不能在执行程序的代码 运行 中使用它。