flink - 使用匕首注入 - 不可序列化?
flink - using dagger injections - not serializable?
我正在使用 Flink(最新来自 git)从 kafka 流式传输到 cassandra。为了简化单元测试,我通过 Dagger 添加了依赖注入。
ObjectGraph 似乎设置正确,但 'inner objects' 被 Flink 标记为 'not serializable'。如果我直接包含这些对象,它们就可以工作 - 那有什么区别?
有问题的 Class 实现 MapFunction 和 @Inject 一个用于 cassandra 的模块和一个用于读取配置文件的模块。
有没有办法构建它以便我可以使用后期绑定,或者 Flink 是否使这不可能?
编辑:
fwiw - 依赖注入(通过匕首)和 RichMapFunction 不能共存。 Dagger 不允许您包含任何在其定义中具有 extends 的对象。
进一步:
通过 Dagger Lazy 实例化的对象也不会序列化。
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.someapp.SaveMap@2e029d61 not serializable
...
Caused by: java.io.NotSerializableException: dagger.internal.LazyBinding
在深入探讨问题的具体细节之前,先了解一下 Apache Flink 中函数可序列化的背景知识:
可序列化
Apache Flink 使用 Java 序列化 (java.io.Serializable) 将函数对象(此处为 MapFunction
)传送给并行执行它们的工作程序。因此,函数需要是可序列化的:该函数可能不包含任何不可序列化的字段,即非原始类型(int、long、double...)且未实现 java.io.Serializable
.
处理不可序列化结构的典型方法是延迟初始化它们。
延迟初始化
在 Flink 函数中使用不可序列化类型的一种方法是延迟初始化它们。保存这些类型的字段在函数序列化交付时仍然null
,并且只有在函数被工人反序列化后才设置。
在 Scala 中,您可以简单地使用惰性字段,例如 lazy val x = new NonSerializableType()
。 NonSerializableType
类型实际上只在第一次访问变量 x
时创建,通常在 worker 上。因此,该类型可以是不可序列化的,因为当函数被序列化以运送给工人时 x
为 null。
在Java中,你可以在函数的open()
方法上初始化不可序列化的字段,如果你把它做成Rich Function。富函数(如 RichMapFunction
)是基本函数(此处为 MapFunction
)的扩展版本,可让您访问生命周期方法,如 open()
和 close()
.
惰性依赖注入
我不太熟悉依赖注入,但 dagger 似乎也提供了类似于惰性依赖的东西,这可能有助于作为一种解决方法,就像 Scala 中的惰性变量一样:
new MapFunction<Long, Long>() {
@Inject Lazy<MyDependency> dep;
public Long map(Long value) {
return dep.get().doSomething(value);
}
}
我遇到了类似的问题。有两种方法可以不反序列化您的依赖项。
使你的依赖静态化,但这并不总是可行的。它还会打乱您的代码设计。
使用瞬态:通过将您的依赖项声明为瞬态,您是在说它们不是对象持久状态的一部分,也不应该是序列化的一部分。
public ClassA implements Serializable{
//class A code here
}
public ClassB{
//class B code here
}
public class MySinkFunction implements SinkFunction<MyData> {
private ClassA mySerializableDependency;
private transient ClassB nonSerializableDependency;
}
这在您使用外部库时特别有用,您无法更改其实现以使其可序列化。
我正在使用 Flink(最新来自 git)从 kafka 流式传输到 cassandra。为了简化单元测试,我通过 Dagger 添加了依赖注入。
ObjectGraph 似乎设置正确,但 'inner objects' 被 Flink 标记为 'not serializable'。如果我直接包含这些对象,它们就可以工作 - 那有什么区别?
有问题的Class 实现 MapFunction 和 @Inject 一个用于 cassandra 的模块和一个用于读取配置文件的模块。
有没有办法构建它以便我可以使用后期绑定,或者 Flink 是否使这不可能?
编辑:
fwiw - 依赖注入(通过匕首)和 RichMapFunction 不能共存。 Dagger 不允许您包含任何在其定义中具有 extends 的对象。
进一步:
通过 Dagger Lazy
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.someapp.SaveMap@2e029d61 not serializable
...
Caused by: java.io.NotSerializableException: dagger.internal.LazyBinding
在深入探讨问题的具体细节之前,先了解一下 Apache Flink 中函数可序列化的背景知识:
可序列化
Apache Flink 使用 Java 序列化 (java.io.Serializable) 将函数对象(此处为 MapFunction
)传送给并行执行它们的工作程序。因此,函数需要是可序列化的:该函数可能不包含任何不可序列化的字段,即非原始类型(int、long、double...)且未实现 java.io.Serializable
.
处理不可序列化结构的典型方法是延迟初始化它们。
延迟初始化
在 Flink 函数中使用不可序列化类型的一种方法是延迟初始化它们。保存这些类型的字段在函数序列化交付时仍然null
,并且只有在函数被工人反序列化后才设置。
在 Scala 中,您可以简单地使用惰性字段,例如
lazy val x = new NonSerializableType()
。NonSerializableType
类型实际上只在第一次访问变量x
时创建,通常在 worker 上。因此,该类型可以是不可序列化的,因为当函数被序列化以运送给工人时x
为 null。在Java中,你可以在函数的
open()
方法上初始化不可序列化的字段,如果你把它做成Rich Function。富函数(如RichMapFunction
)是基本函数(此处为MapFunction
)的扩展版本,可让您访问生命周期方法,如open()
和close()
.
惰性依赖注入
我不太熟悉依赖注入,但 dagger 似乎也提供了类似于惰性依赖的东西,这可能有助于作为一种解决方法,就像 Scala 中的惰性变量一样:
new MapFunction<Long, Long>() {
@Inject Lazy<MyDependency> dep;
public Long map(Long value) {
return dep.get().doSomething(value);
}
}
我遇到了类似的问题。有两种方法可以不反序列化您的依赖项。
使你的依赖静态化,但这并不总是可行的。它还会打乱您的代码设计。
使用瞬态:通过将您的依赖项声明为瞬态,您是在说它们不是对象持久状态的一部分,也不应该是序列化的一部分。
public ClassA implements Serializable{ //class A code here } public ClassB{ //class B code here } public class MySinkFunction implements SinkFunction<MyData> { private ClassA mySerializableDependency; private transient ClassB nonSerializableDependency; }
这在您使用外部库时特别有用,您无法更改其实现以使其可序列化。