哈希图的火花序列化问题
spark serialization issue with hashmap
我正在尝试序列化一个 Java class,其 objects 将被收集到 Java Spark RDD 中。
POJO 包含一些数据结构,如 HashMap
和 ArrayList
.
当我尝试遍历 driver 中的 RDD
objects 并尝试获取 hashmap
元素时,它会抛出一个 Serialization exception
.
java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
示例程序:
Driver Class:
主要实现:
public class MyMainTest {
public static ArrayList<MyInterface> call() throws Exception{
ArrayList<MyInterface> li = new ArrayList<MyInterface>();
for(int i=0;i<3;i++){
MyRecord myrec = new MyRecord();
li.add(myrec.addToTagMap());
}
return li;
}
}
Main Implementation:
public class MyMainTest {
public static ArrayList<MyInterface> call() throws Exception{
ArrayList<MyInterface> li = new ArrayList<MyInterface>();
for(int i=0;i<3;i++){
MyRecord myrec = new MyRecord();
li.add(myrec.addToTagMap());
}
return li;
}
}
My Pojo Class:
public class MyRecord implements MyInterface,Serializable{
protected HashMap<String, Object> allTagNameValueMap_
= new HashMap<String, Object>(){
{
put("key1", "T1");
put("key2", "Val2");
put("key3", 3);
}};
protected MyRecord addToTagMap() throws Exception {
MyRecord myRec = new MyRecord();
myRec.allTagNameValueMap_.put("key1", "New Value");
myRec.allTagNameValueMap_.put("key2", "New Value2");
myRec.allTagNameValueMap_.put("key4", 22);
return myRec;
}
@Override
public Object getKey1() {
// TODO Auto-generated method stub
return allTagNameValueMap_.get("key1");
}
@Override
public Object getKey2() {
// TODO Auto-generated method stub
return allTagNameValueMap_.get("key2");
}
@Override
public Object getKey3() {
// TODO Auto-generated method stub
return allTagNameValueMap_.get("key3");
}
}
My Interface:
public interface MyInterface extends Serializable{
public Object getKey1();
public Object getKey2();
public Object getKey3();
}
序列化跟踪:
allTagNameValueMap_ (com.org.util.ex.ExRecord)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140)
at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
原因:com.esotericsoftware.kryo.KryoException:java.lang.NullPointerException
序列化跟踪:
allTagNameValueMap_ (com.org.util.ex.ExRecord)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36)
at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$$anonfun$apply$mcV$sp.apply(ParallelCollectionRDD.scala:80)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$$anonfun$apply$mcV$sp.apply(ParallelCollectionRDD.scala:80)
at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:130)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject.apply$mcV$sp(ParallelCollectionRDD.scala:80)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
... 20 more
原因:java.lang.NullPointerException
在 java.util.HashMap.put(HashMap.java:493)
在 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
在 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
在 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
在 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
在Spark
中序列化HashMap
时有什么特别需要注意的地方吗?
我解决了这个问题。
我遵循的步骤。
- 创建一个 Kryo 注册器 class 并将 Pojo 注册为 Kryo Serializable 。
public class MyKryoRegistrator implements KryoRegistrator, Serializable {
@Override
public void registerClasses(Kryo kryo) {
// Product POJO associated to a product Row from the DataFrame
kryo.register(MyRecord.class);
}
}
2.Set 可序列化类型和 Kryo 注册器 class 在我的 main 中的 spark conf 中。
sConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
sConf.set("spark.kryo.registrator","com.spark.util.umf.MyKryoRegistrator");
3.Initialize 在 POJO 中使用构造函数的 HashMap class .
protected Map allTagNameValueMap_ =new
HashMap(); public MyRecord(){
allTagNameValueMap_.put("key1", "V1");
allTagNameValueMap_.put("key2", "V2");
allTagNameValueMap_.put("key3", "V3"); };
我正在尝试序列化一个 Java class,其 objects 将被收集到 Java Spark RDD 中。
POJO 包含一些数据结构,如 HashMap
和 ArrayList
.
当我尝试遍历 driver 中的 RDD
objects 并尝试获取 hashmap
元素时,它会抛出一个 Serialization exception
.
java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
示例程序:
Driver Class:
主要实现:
public class MyMainTest {
public static ArrayList<MyInterface> call() throws Exception{
ArrayList<MyInterface> li = new ArrayList<MyInterface>();
for(int i=0;i<3;i++){
MyRecord myrec = new MyRecord();
li.add(myrec.addToTagMap());
}
return li;
}
}
Main Implementation:
public class MyMainTest {
public static ArrayList<MyInterface> call() throws Exception{
ArrayList<MyInterface> li = new ArrayList<MyInterface>();
for(int i=0;i<3;i++){
MyRecord myrec = new MyRecord();
li.add(myrec.addToTagMap());
}
return li;
}
}
My Pojo Class:
public class MyRecord implements MyInterface,Serializable{
protected HashMap<String, Object> allTagNameValueMap_
= new HashMap<String, Object>(){
{
put("key1", "T1");
put("key2", "Val2");
put("key3", 3);
}};
protected MyRecord addToTagMap() throws Exception {
MyRecord myRec = new MyRecord();
myRec.allTagNameValueMap_.put("key1", "New Value");
myRec.allTagNameValueMap_.put("key2", "New Value2");
myRec.allTagNameValueMap_.put("key4", 22);
return myRec;
}
@Override
public Object getKey1() {
// TODO Auto-generated method stub
return allTagNameValueMap_.get("key1");
}
@Override
public Object getKey2() {
// TODO Auto-generated method stub
return allTagNameValueMap_.get("key2");
}
@Override
public Object getKey3() {
// TODO Auto-generated method stub
return allTagNameValueMap_.get("key3");
}
}
My Interface:
public interface MyInterface extends Serializable{
public Object getKey1();
public Object getKey2();
public Object getKey3();
}
序列化跟踪:
allTagNameValueMap_ (com.org.util.ex.ExRecord)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140)
at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
原因:com.esotericsoftware.kryo.KryoException:java.lang.NullPointerException 序列化跟踪:
allTagNameValueMap_ (com.org.util.ex.ExRecord)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36)
at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$$anonfun$apply$mcV$sp.apply(ParallelCollectionRDD.scala:80)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$$anonfun$apply$mcV$sp.apply(ParallelCollectionRDD.scala:80)
at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:130)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject.apply$mcV$sp(ParallelCollectionRDD.scala:80)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137)
... 20 more
原因:java.lang.NullPointerException 在 java.util.HashMap.put(HashMap.java:493) 在 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) 在 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) 在 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) 在 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
在Spark
中序列化HashMap
时有什么特别需要注意的地方吗?
我解决了这个问题。
我遵循的步骤。
- 创建一个 Kryo 注册器 class 并将 Pojo 注册为 Kryo Serializable 。
public class MyKryoRegistrator implements KryoRegistrator, Serializable { @Override public void registerClasses(Kryo kryo) { // Product POJO associated to a product Row from the DataFrame kryo.register(MyRecord.class); } }
2.Set 可序列化类型和 Kryo 注册器 class 在我的 main 中的 spark conf 中。
sConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
sConf.set("spark.kryo.registrator","com.spark.util.umf.MyKryoRegistrator");
3.Initialize 在 POJO 中使用构造函数的 HashMap class .
protected Map allTagNameValueMap_ =new HashMap(); public MyRecord(){ allTagNameValueMap_.put("key1", "V1"); allTagNameValueMap_.put("key2", "V2"); allTagNameValueMap_.put("key3", "V3"); };