Apache Flink 中的不可序列化对象

Non Serializable object in Apache Flink

我正在使用 Apache Flink 对流数据执行分析。

我正在使用一个 依赖项,它的对象需要超过 10 秒才能创建,因为它在初始化之前会读取 hdfs 中存在的几个文件。

如果我在 open 方法中初始化对象,我会得到超时异常,如果在 sink/flatmap 的构造函数中,我会得到序列化异常。

目前我正在使用静态块初始化其他class中的对象,在主文件中使用Preconditions.checkNotNull(MGenerator.mGenerator)然后如果在 sink 的平面图中使用它就可以工作了。

有没有办法创建一个不可序列化的依赖对象,它可能需要超过 10 秒才能在 Flink 的平面图或接收器中初始化?

public class DependencyWrap {

  static MGenerator mGenerator;

  static {
    final String configStr = "{}";
    final Config config = new Gson().fromJson(config, Config.class);
    mGenerator = new MGenerator(config);
  }

}
public class MyStreaming {

  public static void main(String[] args) throws Exception {

    Preconditions.checkNotNull(MGenerator.mGenerator);
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(parallelism);
    ...
    input.flatMap(new RichFlatMapFunction<Map<String,Object>,List<String>>() {

      @Override
      public void open(Configuration parameters) {
      }

      @Override
      public void flatMap(Map<String,Object> value, Collector<List<String>> out) throws Exception {

        out.collect(MFVGenerator.mfvGenerator.generateMyResult(value.f0, value.f1));
      }

    });

  }
}

另外,如果我对问题的理解有误,请指正。

在 Open 方法中执行此操作是 100% 正确的方法。 Flink 给你的是超时异常,还是对象?

作为最后的方法,您可以将对象包装在 class 中,其中包含对象及其 JSON 字符串或 Config(Config 是否可序列化?),对象标记为 transient 和然后覆盖 ReadObject/WriteObject 方法来调用构造函数。如果 mGenerator 对象本身是无状态的(如果不是,您将遇到其他问题),序列化代码应该只在作业分配给任务管理器时调用一次。

使用 open 通常是加载外部查找源的正确位置。超时有点奇怪,可能有配置吧。

但是,如果使用静态加载器(无论是静态 class 还是单例)的好处是,您只需为同一任务上的所有并行任务实例加载一次经理。因此,您可以节省内存和 CPU 时间。这对您来说尤其如此,因为您在两个单独的任务中使用相同的数据结构。此外,静态加载器可以在第一次使用时延迟初始化,以避免 open.

中的超时。

这种方法的明显缺点是代码的可测试性受到影响。有一些解决方法,如果有兴趣我可以扩展。

我没有看到使用代理序列化程序模式的好处。它不必要地复杂(Java 中的自定义序列化)并且几乎没有什么好处。