Scala:由 json4s 引起的 RDD 映射中的任务不可序列化 "implicit val formats = DefaultFormats"
Scala: Task not serializable in RDD map Caused by json4s "implicit val formats = DefaultFormats"
以下程序尝试为每个 ROW(在 RDD 映射中)调用 3 个函数:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
class TagCalculation extends Serializable {
def test1(x: String) = x + " test1"
def test2(x: String) = x + "test2"
def test3(x: String) = x + "test3"
def test5(arg1: java.lang.Integer, arg2: String, arg3: scala.collection.immutable.$colon$colon[Any]) = "test mix2"
}
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
val get_test = new TagCalculation
val field = Array("test1","test2","test3")
val bb = df.rdd.map(row => {
val reValue1 = "start"
val ret = for(every <- field)
yield {
val test_para = Array(reValue1)
val argtypes = test_para.map(_.getClass)
val method4 = get_test.getClass.getMethod(every, argtypes: _*)
val bbq = method4.invoke(get_test, test_para: _*)
if (field.last == every)
bbq
}
ret.last
})
但有些错误输出:
org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at
org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:314) at
org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:313) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at
org.apache.spark.rdd.RDD.map(RDD.scala:313)
........ at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by:
java.io.NotSerializableException: org.json4s.DefaultFormats$
有什么指点吗?
可能是"implicit val formats = DefaultFormats"造成的。但是我需要在 "map".
之前提取值
问题是因为您在初始化和使用对象的 calling class
中定义了 TagCalculation
class。只需将其移到 calling class
之外或将其设为 separate class
即可解决 NotSerializableException
的问题。
以下程序尝试为每个 ROW(在 RDD 映射中)调用 3 个函数:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
class TagCalculation extends Serializable {
def test1(x: String) = x + " test1"
def test2(x: String) = x + "test2"
def test3(x: String) = x + "test3"
def test5(arg1: java.lang.Integer, arg2: String, arg3: scala.collection.immutable.$colon$colon[Any]) = "test mix2"
}
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
val get_test = new TagCalculation
val field = Array("test1","test2","test3")
val bb = df.rdd.map(row => {
val reValue1 = "start"
val ret = for(every <- field)
yield {
val test_para = Array(reValue1)
val argtypes = test_para.map(_.getClass)
val method4 = get_test.getClass.getMethod(every, argtypes: _*)
val bbq = method4.invoke(get_test, test_para: _*)
if (field.last == every)
bbq
}
ret.last
})
但有些错误输出:
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:314) at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:313) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.map(RDD.scala:313) ........ at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$
有什么指点吗?
可能是"implicit val formats = DefaultFormats"造成的。但是我需要在 "map".
之前提取值问题是因为您在初始化和使用对象的 calling class
中定义了 TagCalculation
class。只需将其移到 calling class
之外或将其设为 separate class
即可解决 NotSerializableException
的问题。