在 RDD 上使用 take 方法时 Apache Spark 抛出反序列化错误
Apache Spark Throwing Deserialization Error when using take method on RDD
我是 Spark 的新手,我正在使用 Scala 2.12.8 和 Spark 2.4.0。我正在尝试在 Spark MLLib 中使用随机森林分类器。我可以构建和训练分类器,分类器可以预测我是否在生成的 RDD 上使用 first() 函数。但是,如果我尝试使用 take(n) 函数,我会得到一个非常大、丑陋的堆栈跟踪。有谁知道我做错了什么?错误发生在行中:“.take(3)”。我知道这是我在 RDD 上执行的第一个有效操作,所以如果有人能向我解释它失败的原因以及如何修复它,我将非常感激。
object ItsABreeze {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.getOrCreate()
//Do stuff to file
val data: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(spark.sparkContext, "file.svm")
// Split the data into training and test sets (30% held out for testing)
val splits: Array[RDD[LabeledPoint]] = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous
val numClasses = 4
val categoricaFeaturesInfo = Map[Int, Int]()
val numTrees = 3
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 5
val maxBins = 32
val model: RandomForestModel = RandomForest.trainClassifier(
trainingData,
numClasses,
categoricaFeaturesInfo,
numTrees,
featureSubsetStrategy,
impurity,
maxDepth,
maxBins
)
testData
.map((point: LabeledPoint) => model.predict(point.features))
.take(3)
.foreach(println)
spark.stop()
}
}
堆栈跟踪的顶部如下:
java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
... 25 more
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize
at ItsABreeze$.$deserializeLambda$(ItsABreeze.scala)
... 35 more
Caused by: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize
... 36 more
Caused by: java.lang.ClassNotFoundException: scala.runtime.LambdaDeserialize
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
我尝试 运行 的代码是此页面上 classification example 的略微修改版本(来自 Spark 机器学习库文档)。
对我的原始问题的两位评论者都是正确的:我将我使用的 Scala 版本从 2.12.8 更改为 2.11.12,并将 Spark 恢复为 2.2.1,代码 运行就这样。
对于有资格回答这个问题的任何人,这里有一个后续问题:Spark 2.4.0 声称对 Scala 2 有新的实验性支持。12.x。 2.12.x 支持有很多已知问题吗?
我是 Spark 的新手,我正在使用 Scala 2.12.8 和 Spark 2.4.0。我正在尝试在 Spark MLLib 中使用随机森林分类器。我可以构建和训练分类器,分类器可以预测我是否在生成的 RDD 上使用 first() 函数。但是,如果我尝试使用 take(n) 函数,我会得到一个非常大、丑陋的堆栈跟踪。有谁知道我做错了什么?错误发生在行中:“.take(3)”。我知道这是我在 RDD 上执行的第一个有效操作,所以如果有人能向我解释它失败的原因以及如何修复它,我将非常感激。
object ItsABreeze {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.getOrCreate()
//Do stuff to file
val data: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(spark.sparkContext, "file.svm")
// Split the data into training and test sets (30% held out for testing)
val splits: Array[RDD[LabeledPoint]] = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous
val numClasses = 4
val categoricaFeaturesInfo = Map[Int, Int]()
val numTrees = 3
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 5
val maxBins = 32
val model: RandomForestModel = RandomForest.trainClassifier(
trainingData,
numClasses,
categoricaFeaturesInfo,
numTrees,
featureSubsetStrategy,
impurity,
maxDepth,
maxBins
)
testData
.map((point: LabeledPoint) => model.predict(point.features))
.take(3)
.foreach(println)
spark.stop()
}
}
堆栈跟踪的顶部如下:
java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
... 25 more
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize
at ItsABreeze$.$deserializeLambda$(ItsABreeze.scala)
... 35 more
Caused by: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize
... 36 more
Caused by: java.lang.ClassNotFoundException: scala.runtime.LambdaDeserialize
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
我尝试 运行 的代码是此页面上 classification example 的略微修改版本(来自 Spark 机器学习库文档)。
对我的原始问题的两位评论者都是正确的:我将我使用的 Scala 版本从 2.12.8 更改为 2.11.12,并将 Spark 恢复为 2.2.1,代码 运行就这样。
对于有资格回答这个问题的任何人,这里有一个后续问题:Spark 2.4.0 声称对 Scala 2 有新的实验性支持。12.x。 2.12.x 支持有很多已知问题吗?