Spark 序列化和 Java 序列化有什么区别?

What is the difference between Spark Serialization and Java Serialization?

我正在使用 Spark + Yarn,我有一个服务想在分布式节点上调用。

当我在使用 java 序列化的 Junit 测试中序列化此服务对象 "by hand" 时,服务的所有内部集合都很好地序列化和反序列化:

  @Test
  public void testSerialization() {  

    try (
        ConfigurableApplicationContext contextBusiness = new ClassPathXmlApplicationContext("spring-context.xml");
        FileOutputStream fileOutputStream = new FileOutputStream("myService.ser");
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
        ) {

      final MyService service = (MyService) contextBusiness.getBean("myServiceImpl");

      objectOutputStream.writeObject(service);
      objectOutputStream.flush();

    } catch (final java.io.IOException e) {
      logger.error(e.getMessage(), e);
    }
  }

  @Test
  public void testDeSerialization() throws ClassNotFoundException {  

    try (
        FileInputStream fileInputStream = new FileInputStream("myService.ser");
        ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
        ) {

      final MyService myService = (MyService) objectInputStream.readObject();

      // HERE a functionnal test who proves the service has been fully serialized and deserialized      .

    } catch (final java.io.IOException e) {
      logger.error(e.getMessage(), e);
    }
  }  

但是当我尝试通过我的 Spark 启动器调用此服务时,无论我是否广播服务对象,一些内部集合(HashMap)都会消失(未序列化),就像它被标记为 "transient"(但它既不是瞬态的也不是静态的):

JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new   Function<InputOjbect, OutputObject>() {
  private static final long serialVersionUID = 1L;

  public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception

    MyOutput output = service.evaluate(input);
    return (new OutputObject(output));
  }
});

广播服务结果相同:

final Broadcast<MyService> broadcastedService = sprkCtx.broadcast(service);      
JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new   Function<InputOjbect, OutputObject>() {
  private static final long serialVersionUID = 1L;

  public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception

    MyOutput output = broadcastedService.getValue().evaluate(input);
    return (new OutputObject(output));
  }
});

如果我在本地模式而不是 yarn 集群模式下启动相同的 Spark 代码,它会完美运行。

所以我的问题是:Spark Serialization 和 Java Serialization 有什么区别? (我没有使用 Kryo 或任何自定义序列化)。

编辑:当我尝试使用 Kryo 序列化程序时(没有明确注册任何 class),我遇到了同样的问题。

好的,感谢我们的一位实验数据分析师,我发现了它。

那么,这个谜团是关于什么的?

  • 这与序列化无关(java 或 Kryo)
  • 这与一些预处理或 post 处理无关,Spark 会进行 before/after 序列化
  • 这与完全可序列化的 HashMap 字段无关(如果您阅读了我给出的第一个示例,这一点很明显,但不是对每个人都适用;)

所以...

整个问题是关于这个的:

"if I launch this same Spark code in local mode instead of yarn cluster mode, it works perfectly."

在 "yarn cluster" 模式下,无法初始化集合,因为它是在随机节点上启动的,无法访问磁盘上的初始引用数据。在本地模式下,当在磁盘上找不到初始数据时有一个明显的异常,但在集群模式下它完全没有声音,看起来问题是关于序列化的。

使用 "yarn client" 模式为我们解决了这个问题。