Flink 如何高效序列化带有 LocalDate 字段的 POJO?

How to efficiently serialize POJO with LocalDate field in Flink?

我们的一些 POJO 包含来自 java.time API 的字段(LocalDate、LocalDateTime)。当我们的管道正在处理它们时,我们可以在日志中看到以下信息:

org.apache.flink.api.java.typeutils.TypeExtractor - Class class java.time.LocalDate cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

据我了解,LocalDate 不能归类为 POJO,因此 flink 没有使用 POJO 序列化程序,而是回退到效率较低的 Kryo。但是,由于 1.9.0 版本的 flink 具有针对 java.time 类 的专用序列化程序(例如 LocalDateSerializer),因此我希望这些序列化程序能够在此处完成工作,从而允许使用 POJO 序列化程序对于我们的 类。不是这样吗?如果是,是否会影响性能?如果不是,这种情况的最佳解决方案是什么?

在项目中我们使用 Flink 1.11 和 Java 1.8.

由于向后兼容,即使在 Flink 中引入了新的序列化器,也不能自动使用它。但是,您可以像这样告诉 Flink 将其用于您的 POJO(如果您开始时没有使用 Kryo 的先前保存点):

@TypeInfo(MyClassTypeInfoFactory.class)
public class MyClass {
  public int id;
  public LocalDate date;
  // ...
}

public class MyClassTypeInfoFactory extends TypeInfoFactory<MyClass> {
  @Override
  public TypeInformation<MyClass> createTypeInfo(
      Type t, Map<String, TypeInformation<?>> genericParameters) {

    return Types.POJO(MyClass.class, new HashMap<String, TypeInformation<?>>() { {
        put("id", Types.INT);
        put("date", Types.LOCAL_DATE);
        // ...
    } } );
  }
}

您必须如图所示为所有 POJO 字段提供类型,但是 Types class 中有很多您可以使用的助手。此外,通过像这样使用 TypeInfoFactory,您不必担心 Flink 使用此类型的所有地方 - 它总是会派生给定的类型信息。

如果您需要转换旧的保存点以使用新的序列化程序,您可能还想查看 Flink's State Processor API