累加器作为火花流中的计数器
accumulator as counter in spark streaming
我正在使用 spark streaming,我想制作一个累加器来计算到目前为止 dstream 的实例数。我想在 train 方法中增加计数器,然后我想在另一个名为 assign 的方法中使用这个计数器。下面发布的代码不起作用。我在 sparks UI 中找不到命名的累加器,当我在方法外打印它的值时,它始终为零。如果您认为有更合适的方法(我的意思是不使用累加器),请向我解释如何做。
var numInstances: LongAccumulator = null
def init(ssc:StreamingContext) : Unit = {
numInstances = ssc.sparkContext.longAccumulator("numInst")
}
def train(input: DStream[Example]): Unit = {
input.foreachRDD(rdd => {
rdd.foreach(ex => {
manager = manager.update(ex)
numInstances.add(1)
})
})
}
def assign: Array[Example] = {
if(numInstances.value <= sizeOption.getValue) {
//do something
}
else {
//do something else
}
}
由于传给foreachRDD
的函数是在驱动上执行的,所以可以这样重写train
:
var totalCount = 0L
def train(input: DStream[Example]): Unit = {
input.foreachRDD { rdd =>
totalCount += rdd.count()
rdd.foreach { ex =>
manager = manager.update(ex)
}
}
}
注意 totalCount
值 - Spark 不会分发更改,并且仅在驱动程序上有效,因此您不能在执行程序的代码 运行 中使用它。
我正在使用 spark streaming,我想制作一个累加器来计算到目前为止 dstream 的实例数。我想在 train 方法中增加计数器,然后我想在另一个名为 assign 的方法中使用这个计数器。下面发布的代码不起作用。我在 sparks UI 中找不到命名的累加器,当我在方法外打印它的值时,它始终为零。如果您认为有更合适的方法(我的意思是不使用累加器),请向我解释如何做。
var numInstances: LongAccumulator = null
def init(ssc:StreamingContext) : Unit = {
numInstances = ssc.sparkContext.longAccumulator("numInst")
}
def train(input: DStream[Example]): Unit = {
input.foreachRDD(rdd => {
rdd.foreach(ex => {
manager = manager.update(ex)
numInstances.add(1)
})
})
}
def assign: Array[Example] = {
if(numInstances.value <= sizeOption.getValue) {
//do something
}
else {
//do something else
}
}
由于传给foreachRDD
的函数是在驱动上执行的,所以可以这样重写train
:
var totalCount = 0L
def train(input: DStream[Example]): Unit = {
input.foreachRDD { rdd =>
totalCount += rdd.count()
rdd.foreach { ex =>
manager = manager.update(ex)
}
}
}
注意 totalCount
值 - Spark 不会分发更改,并且仅在驱动程序上有效,因此您不能在执行程序的代码 运行 中使用它。