Flink 如何高效序列化带有 LocalDate 字段的 POJO?
How to efficiently serialize POJO with LocalDate field in Flink?
我们的一些 POJO 包含来自 java.time API 的字段(LocalDate、LocalDateTime)。当我们的管道正在处理它们时,我们可以在日志中看到以下信息:
org.apache.flink.api.java.typeutils.TypeExtractor - Class class java.time.LocalDate cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
据我了解,LocalDate 不能归类为 POJO,因此 flink 没有使用 POJO 序列化程序,而是回退到效率较低的 Kryo。但是,由于 1.9.0 版本的 flink 具有针对 java.time 类 的专用序列化程序(例如 LocalDateSerializer
),因此我希望这些序列化程序能够在此处完成工作,从而允许使用 POJO 序列化程序对于我们的 类。不是这样吗?如果是,是否会影响性能?如果不是,这种情况的最佳解决方案是什么?
在项目中我们使用 Flink 1.11 和 Java 1.8.
由于向后兼容,即使在 Flink 中引入了新的序列化器,也不能自动使用它。但是,您可以像这样告诉 Flink 将其用于您的 POJO(如果您开始时没有使用 Kryo 的先前保存点):
@TypeInfo(MyClassTypeInfoFactory.class)
public class MyClass {
public int id;
public LocalDate date;
// ...
}
public class MyClassTypeInfoFactory extends TypeInfoFactory<MyClass> {
@Override
public TypeInformation<MyClass> createTypeInfo(
Type t, Map<String, TypeInformation<?>> genericParameters) {
return Types.POJO(MyClass.class, new HashMap<String, TypeInformation<?>>() { {
put("id", Types.INT);
put("date", Types.LOCAL_DATE);
// ...
} } );
}
}
您必须如图所示为所有 POJO 字段提供类型,但是 Types
class 中有很多您可以使用的助手。此外,通过像这样使用 TypeInfoFactory
,您不必担心 Flink 使用此类型的所有地方 - 它总是会派生给定的类型信息。
如果您需要转换旧的保存点以使用新的序列化程序,您可能还想查看 Flink's State Processor API。
我们的一些 POJO 包含来自 java.time API 的字段(LocalDate、LocalDateTime)。当我们的管道正在处理它们时,我们可以在日志中看到以下信息:
org.apache.flink.api.java.typeutils.TypeExtractor - Class class java.time.LocalDate cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
据我了解,LocalDate 不能归类为 POJO,因此 flink 没有使用 POJO 序列化程序,而是回退到效率较低的 Kryo。但是,由于 1.9.0 版本的 flink 具有针对 java.time 类 的专用序列化程序(例如 LocalDateSerializer
),因此我希望这些序列化程序能够在此处完成工作,从而允许使用 POJO 序列化程序对于我们的 类。不是这样吗?如果是,是否会影响性能?如果不是,这种情况的最佳解决方案是什么?
在项目中我们使用 Flink 1.11 和 Java 1.8.
由于向后兼容,即使在 Flink 中引入了新的序列化器,也不能自动使用它。但是,您可以像这样告诉 Flink 将其用于您的 POJO(如果您开始时没有使用 Kryo 的先前保存点):
@TypeInfo(MyClassTypeInfoFactory.class)
public class MyClass {
public int id;
public LocalDate date;
// ...
}
public class MyClassTypeInfoFactory extends TypeInfoFactory<MyClass> {
@Override
public TypeInformation<MyClass> createTypeInfo(
Type t, Map<String, TypeInformation<?>> genericParameters) {
return Types.POJO(MyClass.class, new HashMap<String, TypeInformation<?>>() { {
put("id", Types.INT);
put("date", Types.LOCAL_DATE);
// ...
} } );
}
}
您必须如图所示为所有 POJO 字段提供类型,但是 Types
class 中有很多您可以使用的助手。此外,通过像这样使用 TypeInfoFactory
,您不必担心 Flink 使用此类型的所有地方 - 它总是会派生给定的类型信息。
如果您需要转换旧的保存点以使用新的序列化程序,您可能还想查看 Flink's State Processor API。