flink - 使用匕首注入 - 不可序列化?

flink - using dagger injections - not serializable?

我正在使用 Flink(最新来自 git)从 kafka 流式传输到 cassandra。为了简化单元测试,我通过 Dagger 添加了依赖注入。

ObjectGraph 似乎设置正确,但 'inner objects' 被 Flink 标记为 'not serializable'。如果我直接包含这些对象,它们就可以工作 - 那有什么区别?

有问题的

Class 实现 MapFunction@Inject 一个用于 cassandra 的模块和一个用于读取配置文件的模块。

有没有办法构建它以便我可以使用后期绑定,或者 Flink 是否使这不可能?


编辑:

fwiw - 依赖注入(通过匕首)和 RichMapFunction 不能共存。 Dagger 不允许您包含任何在其定义中具有 extends 的对象。

进一步:

通过 Dagger Lazy 实例化的对象也不会序列化。

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.someapp.SaveMap@2e029d61 not serializable
...
Caused by: java.io.NotSerializableException: dagger.internal.LazyBinding

在深入探讨问题的具体细节之前,先了解一下 Apache Flink 中函数可序列化的背景知识:

可序列化

Apache Flink 使用 Java 序列化 (java.io.Serializable) 将函数对象(此处为 MapFunction)传送给并行执行它们的工作程序。因此,函数需要是可序列化的:该函数可能不包含任何不可序列化的字段,即非原始类型(int、long、double...)且未实现 java.io.Serializable.

处理不可序列化结构的典型方法是延迟初始化它们。

延迟初始化

在 Flink 函数中使用不可序列化类型的一种方法是延迟初始化它们。保存这些类型的字段在函数序列化交付时仍然null,并且只有在函数被工人反序列化后才设置。

  • 在 Scala 中,您可以简单地使用惰性字段,例如 lazy val x = new NonSerializableType()NonSerializableType 类型实际上只在第一次访问变量 x 时创建,通常在 worker 上。因此,该类型可以是不可序列化的,因为当函数被序列化以运送给工人时 x 为 null。

  • 在Java中,你可以在函数的open()方法上初始化不可序列化的字段,如果你把它做成Rich Function。富函数(如 RichMapFunction)是基本函数(此处为 MapFunction)的扩展版本,可让您访问生命周期方法,如 open()close().

惰性依赖注入

我不太熟悉依赖注入,但 dagger 似乎也提供了类似于惰性依赖的东西,这可能有助于作为一种解决方法,就像 Scala 中的惰性变量一样:

new MapFunction<Long, Long>() {

  @Inject Lazy<MyDependency> dep;

  public Long map(Long value) {
    return dep.get().doSomething(value);
  }
}

我遇到了类似的问题。有两种方法可以不反序列化您的依赖项。

  1. 使你的依赖静态化,但这并不总是可行的。它还会打乱您的代码设计。

  2. 使用瞬态:通过将您的依赖项声明为瞬态,您是在说它们不是对象持久状态的一部分,也不应该是序列化的一部分。

public ClassA implements Serializable{
  //class A code here
}

public ClassB{
  //class B code here
}

public class MySinkFunction implements SinkFunction<MyData> {
  private ClassA mySerializableDependency;
  private transient ClassB nonSerializableDependency;
}

这在您使用外部库时特别有用,您无法更改其实现以使其可序列化。