Spark 地图作业异常:对象不可序列化
Spark map job Exception :Object not serializable
Ws Class
package MPD1
import java.util.UUID.randomUUID;
class Ws (typ:String,c:Float,t:Float,r:Float,a:Float,s:Float,cu:Float) {
var styp=typ;
var uid=randomUUID().toString;
var cost :Float =c;
var time :Float =t;
var reliability :Float =r;
var availability :Float =a;
var security: Float =s
var customAttributes :Float=cu;
def Ws(typ:String,c:Float,t:Float,r:Float,a:Float,s:Float){
this.styp=typ;
this.uid=randomUUID().toString;
this.cost =c;
this.time =t;
this.reliability =r;
this.availability =a;
this.security=s;
this.customAttributes=Float.MaxValue;
}
def display()={
println("STyp : "+styp+"| UID : "+uid+"|"+"cost :"+cost+"|"+"time :"+time+"|"+"reliability :"+reliability+"|"+"availability :"+availability+"|"+"security :"+security+"|"+"customAttributes :"+customAttributes+"|");
}
}
我想创建 class Ws 的对象然后将其并行化以便可以完成进一步的映射重新生成作业的主要功能
package MPD1
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
import java.lang.Exception
object test {
def main(args: Array[String]) {
try {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
println(" \nHello World from Scala!\n")
var wsArray = new Array[MPD1.Ws](10000)
var i: Int = 0;
val filename = "/home/nazi/Downloads/file.csv";
var lines = sc.textFile(filename)
var rddWsAll=lines.map(f=>Functions.createdDS(f));
rddWsAll.collect().take(10).foreach(f=>f.display())
for (line <- lines) {
var tempStr = line.split(",");
println(tempStr(0).toString())
var wsTemp: Ws = new Ws(tempStr(0).toString(), tempStr(2).toFloat, tempStr(3).toFloat, tempStr(4).toFloat, tempStr(5).toFloat, tempStr(6).toFloat, tempStr(7).toFloat);
wsArray(i) = wsTemp;
wsTemp.display();
i = i + 1;
}
}
catch {
case e: javax.script.ScriptException => e.printStackTrace
}
}
}
package MPD1
object Functions {
def createdDS(f:String):Ws={
var tempStr = f.split(",");
var wsTemp: Ws = new Ws(tempStr(0).toString(), tempStr(2).toFloat, tempStr(3).toFloat, tempStr(4).toFloat, tempStr(5).toFloat, tempStr(6).toFloat, tempStr(7).toFloat);
return wsTemp
}
}
我遇到一个错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: MPD1.Ws
Serialization stack:
- object not serializable (class: MPD1.Ws, value: MPD1.Ws@47acf13d)
- element of array (index: 0)
- array (class [LMPD1.Ws;, size 10000)
....
.....
我在猜测地图函数中的Ws class
var rddWsAll=lines.map(f=>Functions.createdDS(f));
引起问题。但是为什么我们不允许用 class 创建 rdds 或者我们只允许用字符串
创建 rdds
我正在使用
scalaVersion := "2.11.8"
spark version :="2.2.1"
看起来 class Ws
不是可序列化的。使可序列化为
class Ws extends java.io.Serializable (...
请注意,RDD 默认是可序列化的。
Ws Class
package MPD1
import java.util.UUID.randomUUID;
class Ws (typ:String,c:Float,t:Float,r:Float,a:Float,s:Float,cu:Float) {
var styp=typ;
var uid=randomUUID().toString;
var cost :Float =c;
var time :Float =t;
var reliability :Float =r;
var availability :Float =a;
var security: Float =s
var customAttributes :Float=cu;
def Ws(typ:String,c:Float,t:Float,r:Float,a:Float,s:Float){
this.styp=typ;
this.uid=randomUUID().toString;
this.cost =c;
this.time =t;
this.reliability =r;
this.availability =a;
this.security=s;
this.customAttributes=Float.MaxValue;
}
def display()={
println("STyp : "+styp+"| UID : "+uid+"|"+"cost :"+cost+"|"+"time :"+time+"|"+"reliability :"+reliability+"|"+"availability :"+availability+"|"+"security :"+security+"|"+"customAttributes :"+customAttributes+"|");
}
}
我想创建 class Ws 的对象然后将其并行化以便可以完成进一步的映射重新生成作业的主要功能
package MPD1
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
import java.lang.Exception
object test {
def main(args: Array[String]) {
try {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
println(" \nHello World from Scala!\n")
var wsArray = new Array[MPD1.Ws](10000)
var i: Int = 0;
val filename = "/home/nazi/Downloads/file.csv";
var lines = sc.textFile(filename)
var rddWsAll=lines.map(f=>Functions.createdDS(f));
rddWsAll.collect().take(10).foreach(f=>f.display())
for (line <- lines) {
var tempStr = line.split(",");
println(tempStr(0).toString())
var wsTemp: Ws = new Ws(tempStr(0).toString(), tempStr(2).toFloat, tempStr(3).toFloat, tempStr(4).toFloat, tempStr(5).toFloat, tempStr(6).toFloat, tempStr(7).toFloat);
wsArray(i) = wsTemp;
wsTemp.display();
i = i + 1;
}
}
catch {
case e: javax.script.ScriptException => e.printStackTrace
}
}
}
package MPD1
object Functions {
def createdDS(f:String):Ws={
var tempStr = f.split(",");
var wsTemp: Ws = new Ws(tempStr(0).toString(), tempStr(2).toFloat, tempStr(3).toFloat, tempStr(4).toFloat, tempStr(5).toFloat, tempStr(6).toFloat, tempStr(7).toFloat);
return wsTemp
}
}
我遇到一个错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: MPD1.Ws
Serialization stack:
- object not serializable (class: MPD1.Ws, value: MPD1.Ws@47acf13d)
- element of array (index: 0)
- array (class [LMPD1.Ws;, size 10000)
....
.....
我在猜测地图函数中的Ws class
var rddWsAll=lines.map(f=>Functions.createdDS(f));
引起问题。但是为什么我们不允许用 class 创建 rdds 或者我们只允许用字符串
我正在使用
scalaVersion := "2.11.8"
spark version :="2.2.1"
看起来 class Ws
不是可序列化的。使可序列化为
class Ws extends java.io.Serializable (...
请注意,RDD 默认是可序列化的。