Scalatest 和 Spark 捐赠 "java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper"
Scalatest and Spark giving "java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper"
我正在“com.holdenkarau.spark-testing-base”和scalatest 的帮助下测试 Spark Streaming 应用程序.
import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfter, FunSuite }
class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase {
var delim: String = ","
before {
System.clearProperty("spark.driver.port")
}
test(“This Fails“) {
val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv"))
val input = source.getLines.toList
val rowRDDOut = Calculator.do(sc.parallelize(input)) //Returns DataFrame
val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1))
source.close
}
}
我得到字段 'delim' 的序列化异常:
org.apache.spark.SparkException: Task not serializable
[info] at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
[info] at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
[info] at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
[info] at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
[info] at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:324)
[info] at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:323)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
[info] at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
[info] at org.apache.spark.rdd.RDD.map(RDD.scala:323)
[info] ...
[info] Cause: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
[info] Serialization stack:
[info] - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@78b339fa)
[info] - field (class: org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper)
如果我将 'delim' 替换为字符串值,它就可以正常工作。
val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + “,” + row.getAs[String](1))
第一版和第二版有什么区别?
提前致谢!
问题不在于 delim
(字符串)的类型,而在于 delim
本身。
尽量不要在 test()
方法之外定义变量。如果您在 test
中定义 delm
它应该可以工作。
test(“This Fails“) {
val delim = ","
...
}
现在,您可能会问为什么?那么,当您从外部范围引用 delim
时,Scala 将尝试将封闭对象 class Test
组合在一起。该对象包含对 org.scalatest.Assertions$AssertionsHelper
的引用,它不是可序列化的(请参阅您的堆栈跟踪)。
我今天 运行 遇到了这个问题,即使我按照 .
中提到的那样将所有代码都移到了测试中,错误仍然存在
最后,发现我在代码中使用了错误的语法(编译器没有捕捉到)。在我的例子中是这样的:
// Wrong
df.filter(x => x.id === y)
// Right
df.filter(x => x.id == y)
我正在“com.holdenkarau.spark-testing-base”和scalatest 的帮助下测试 Spark Streaming 应用程序.
import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfter, FunSuite }
class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase {
var delim: String = ","
before {
System.clearProperty("spark.driver.port")
}
test(“This Fails“) {
val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv"))
val input = source.getLines.toList
val rowRDDOut = Calculator.do(sc.parallelize(input)) //Returns DataFrame
val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1))
source.close
}
}
我得到字段 'delim' 的序列化异常:
org.apache.spark.SparkException: Task not serializable
[info] at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
[info] at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
[info] at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
[info] at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
[info] at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:324)
[info] at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:323)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
[info] at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
[info] at org.apache.spark.rdd.RDD.map(RDD.scala:323)
[info] ...
[info] Cause: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
[info] Serialization stack:
[info] - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@78b339fa)
[info] - field (class: org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper)
如果我将 'delim' 替换为字符串值,它就可以正常工作。
val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + “,” + row.getAs[String](1))
第一版和第二版有什么区别?
提前致谢!
问题不在于 delim
(字符串)的类型,而在于 delim
本身。
尽量不要在 test()
方法之外定义变量。如果您在 test
中定义 delm
它应该可以工作。
test(“This Fails“) {
val delim = ","
...
}
现在,您可能会问为什么?那么,当您从外部范围引用 delim
时,Scala 将尝试将封闭对象 class Test
组合在一起。该对象包含对 org.scalatest.Assertions$AssertionsHelper
的引用,它不是可序列化的(请参阅您的堆栈跟踪)。
我今天 运行 遇到了这个问题,即使我按照
最后,发现我在代码中使用了错误的语法(编译器没有捕捉到)。在我的例子中是这样的:
// Wrong
df.filter(x => x.id === y)
// Right
df.filter(x => x.id == y)