无法声明 String 类型的累加器
Not able to declare String type accumulator
我试图在 Scala shell(驱动程序)中定义一个字符串类型的累加器变量,但我不断收到以下错误:-
scala> val myacc = sc.accumulator("Test")
<console>:21: error: could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[String]
val myacc = sc.accumulator("Test")
^
对于 Int 或 Double 类型的累加器,这似乎不是问题。
谢谢
这是因为 Spark 默认只提供 Long
、Double
和 Float
类型的累加器。如果您需要其他东西,则必须扩展 AccumulatorParam
.
import org.apache.spark.AccumulatorParam
object StringAccumulatorParam extends AccumulatorParam[String] {
def zero(initialValue: String): String = {
""
}
def addInPlace(s1: String, s2: String): String = {
s"$s1 $s2"
}
}
val stringAccum = sc.accumulator("")(StringAccumulatorParam)
val rdd = sc.parallelize("foo" :: "bar" :: Nil, 2)
rdd.foreach(s => stringAccum += s)
stringAccum.value
注:
一般来说,您应该避免将累加器用于数据可能随时间显着增长的任务。它的行为将类似于 group
和 collect
并且在最坏的情况下可能会由于缺乏资源而失败。累加器主要用于简单的诊断任务,例如跟踪基本统计数据。
我试图在 Scala shell(驱动程序)中定义一个字符串类型的累加器变量,但我不断收到以下错误:-
scala> val myacc = sc.accumulator("Test")
<console>:21: error: could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[String]
val myacc = sc.accumulator("Test")
^
对于 Int 或 Double 类型的累加器,这似乎不是问题。
谢谢
这是因为 Spark 默认只提供 Long
、Double
和 Float
类型的累加器。如果您需要其他东西,则必须扩展 AccumulatorParam
.
import org.apache.spark.AccumulatorParam
object StringAccumulatorParam extends AccumulatorParam[String] {
def zero(initialValue: String): String = {
""
}
def addInPlace(s1: String, s2: String): String = {
s"$s1 $s2"
}
}
val stringAccum = sc.accumulator("")(StringAccumulatorParam)
val rdd = sc.parallelize("foo" :: "bar" :: Nil, 2)
rdd.foreach(s => stringAccum += s)
stringAccum.value
注:
一般来说,您应该避免将累加器用于数据可能随时间显着增长的任务。它的行为将类似于 group
和 collect
并且在最坏的情况下可能会由于缺乏资源而失败。累加器主要用于简单的诊断任务,例如跟踪基本统计数据。