Spark 序列化和 Java 序列化有什么区别?
What is the difference between Spark Serialization and Java Serialization?
我正在使用 Spark + Yarn,我有一个服务想在分布式节点上调用。
当我在使用 java 序列化的 Junit 测试中序列化此服务对象 "by hand" 时,服务的所有内部集合都很好地序列化和反序列化:
@Test
public void testSerialization() {
try (
ConfigurableApplicationContext contextBusiness = new ClassPathXmlApplicationContext("spring-context.xml");
FileOutputStream fileOutputStream = new FileOutputStream("myService.ser");
ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
) {
final MyService service = (MyService) contextBusiness.getBean("myServiceImpl");
objectOutputStream.writeObject(service);
objectOutputStream.flush();
} catch (final java.io.IOException e) {
logger.error(e.getMessage(), e);
}
}
@Test
public void testDeSerialization() throws ClassNotFoundException {
try (
FileInputStream fileInputStream = new FileInputStream("myService.ser");
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
) {
final MyService myService = (MyService) objectInputStream.readObject();
// HERE a functionnal test who proves the service has been fully serialized and deserialized .
} catch (final java.io.IOException e) {
logger.error(e.getMessage(), e);
}
}
但是当我尝试通过我的 Spark 启动器调用此服务时,无论我是否广播服务对象,一些内部集合(HashMap)都会消失(未序列化),就像它被标记为 "transient"(但它既不是瞬态的也不是静态的):
JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new Function<InputOjbect, OutputObject>() {
private static final long serialVersionUID = 1L;
public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception
MyOutput output = service.evaluate(input);
return (new OutputObject(output));
}
});
广播服务结果相同:
final Broadcast<MyService> broadcastedService = sprkCtx.broadcast(service);
JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new Function<InputOjbect, OutputObject>() {
private static final long serialVersionUID = 1L;
public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception
MyOutput output = broadcastedService.getValue().evaluate(input);
return (new OutputObject(output));
}
});
如果我在本地模式而不是 yarn 集群模式下启动相同的 Spark 代码,它会完美运行。
所以我的问题是:Spark Serialization 和 Java Serialization 有什么区别? (我没有使用 Kryo 或任何自定义序列化)。
编辑:当我尝试使用 Kryo 序列化程序时(没有明确注册任何 class),我遇到了同样的问题。
好的,感谢我们的一位实验数据分析师,我发现了它。
那么,这个谜团是关于什么的?
- 这与序列化无关(java 或 Kryo)
- 这与一些预处理或 post 处理无关,Spark 会进行 before/after 序列化
- 这与完全可序列化的 HashMap 字段无关(如果您阅读了我给出的第一个示例,这一点很明显,但不是对每个人都适用;)
所以...
整个问题是关于这个的:
"if I launch this same Spark code in local mode instead of yarn cluster
mode, it works perfectly."
在 "yarn cluster" 模式下,无法初始化集合,因为它是在随机节点上启动的,无法访问磁盘上的初始引用数据。在本地模式下,当在磁盘上找不到初始数据时有一个明显的异常,但在集群模式下它完全没有声音,看起来问题是关于序列化的。
使用 "yarn client" 模式为我们解决了这个问题。
我正在使用 Spark + Yarn,我有一个服务想在分布式节点上调用。
当我在使用 java 序列化的 Junit 测试中序列化此服务对象 "by hand" 时,服务的所有内部集合都很好地序列化和反序列化:
@Test
public void testSerialization() {
try (
ConfigurableApplicationContext contextBusiness = new ClassPathXmlApplicationContext("spring-context.xml");
FileOutputStream fileOutputStream = new FileOutputStream("myService.ser");
ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
) {
final MyService service = (MyService) contextBusiness.getBean("myServiceImpl");
objectOutputStream.writeObject(service);
objectOutputStream.flush();
} catch (final java.io.IOException e) {
logger.error(e.getMessage(), e);
}
}
@Test
public void testDeSerialization() throws ClassNotFoundException {
try (
FileInputStream fileInputStream = new FileInputStream("myService.ser");
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
) {
final MyService myService = (MyService) objectInputStream.readObject();
// HERE a functionnal test who proves the service has been fully serialized and deserialized .
} catch (final java.io.IOException e) {
logger.error(e.getMessage(), e);
}
}
但是当我尝试通过我的 Spark 启动器调用此服务时,无论我是否广播服务对象,一些内部集合(HashMap)都会消失(未序列化),就像它被标记为 "transient"(但它既不是瞬态的也不是静态的):
JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new Function<InputOjbect, OutputObject>() {
private static final long serialVersionUID = 1L;
public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception
MyOutput output = service.evaluate(input);
return (new OutputObject(output));
}
});
广播服务结果相同:
final Broadcast<MyService> broadcastedService = sprkCtx.broadcast(service);
JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new Function<InputOjbect, OutputObject>() {
private static final long serialVersionUID = 1L;
public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception
MyOutput output = broadcastedService.getValue().evaluate(input);
return (new OutputObject(output));
}
});
如果我在本地模式而不是 yarn 集群模式下启动相同的 Spark 代码,它会完美运行。
所以我的问题是:Spark Serialization 和 Java Serialization 有什么区别? (我没有使用 Kryo 或任何自定义序列化)。
编辑:当我尝试使用 Kryo 序列化程序时(没有明确注册任何 class),我遇到了同样的问题。
好的,感谢我们的一位实验数据分析师,我发现了它。
那么,这个谜团是关于什么的?
- 这与序列化无关(java 或 Kryo)
- 这与一些预处理或 post 处理无关,Spark 会进行 before/after 序列化
- 这与完全可序列化的 HashMap 字段无关(如果您阅读了我给出的第一个示例,这一点很明显,但不是对每个人都适用;)
所以...
整个问题是关于这个的:
"if I launch this same Spark code in local mode instead of yarn cluster mode, it works perfectly."
在 "yarn cluster" 模式下,无法初始化集合,因为它是在随机节点上启动的,无法访问磁盘上的初始引用数据。在本地模式下,当在磁盘上找不到初始数据时有一个明显的异常,但在集群模式下它完全没有声音,看起来问题是关于序列化的。
使用 "yarn client" 模式为我们解决了这个问题。