Flink 模式演变不适用于 POJO class

Flink schema evolution not working for POJO class

我有一个 class 满足被视为 POJO 的要求, 这是我的流媒体工作中的主要传输方式 class (它只包含基元和一个 Map<String, String>)。 我添加了一个新的 String 字段和相应的 getter 和 setter, 但是如果我停止使用以前的 class 和保存点的作业并尝试使用它重新启动新的 class, 我得到一个例外:

 java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3b2dbb810ac7d55949cb205a3075facc_(8/8) from any of the 1 provided restore options.
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
        ... 6 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
        at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend(StreamTaskStateInitializerImpl.java:291)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        ... 8 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
...
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
        at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
        at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader[=11=](StateTableByKeyGroupReaders.java:77)
        at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
        ... 12 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
        at java.util.ArrayList.rangeCheck(ArrayList.java:657)
        at java.util.ArrayList.get(ArrayList.java:433)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 23 common frames omitted

出于某种原因,它又回到了 Kryo。

我使用的是 Flink 1.9.3,according to the documentation应该支持这个改动。


根据 David 的回答,我正在尝试查看是否可以在 将新字段添加到 class 之前动态迁移我的状态 。 我已将 @TypeInfo 注释及其工厂添加到 MyPojo, 我正在尝试像这样迁移状态:

lsd = new ListStateDescriptor<>(
        "newName",
        MyPojo.class
);

// migration

TypeToken<LabeledClassWithTimestamp<String>> typeToken = new TypeToken<LabeledClassWithTimestamp<String>>() {};
ListStateDescriptor<LabeledClassWithTimestamp<String>> legacyLSD = new ListStateDescriptor<>(
        "oldName",
        new KryoSerializer<>((Class<LabeledClassWithTimestamp<String>>) typeToken.getRawType(), runtimeContext.getExecutionConfig())
);

ListState<LabeledClassWithTimestamp<String>> legacyState = runtimeContext.getListState(legacyLSD);
try {
    List<MyPojo> newState = new ArrayList<>();
    legacyState.get().forEach(o -> newState.add((MyPojo) o));

    if (!newState.isEmpty()) {
        runtimeContext.getListState(lsd).update(newState);
        legacyState.clear();
    }
} catch (Exception e) {
    LOG.error("Could not migrate state:", e);
}

但是,如果我使用新的 jar 从以前的保存点恢复作业, Flink 在不同的运算符中抛出一个 StateMigrationException :

2020-11-08T12:57:59.369Z INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph:1511 [flink-akka.actor.default-dispatcher-17] window-operator (1/8) (uid) switched from RUNNING to FAILED. org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.

该运算符的状态只包含整数:

public class SlidingWindowProcessFunction extends ProcessWindowFunction<MyPojo, MyOutput, String, TimeWindow> {
    private static final long serialVersionUID = 1L;
    
    private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
            "window-state", Integer.class);
    
    ...
    
    @Override
    public void process(String key, Context context, Iterable<MyPojo> iterable, Collector<MyOutput> collector) {
        ...
        for (Integer hash : context.windowState().getListState(LSD).get()) {
            alreadyProcessedHashes.add(hash);
        }
        ...
    }
}

即使 POJO class 未在其托管状态中直接使用,MyPojo 的序列化程序是否与此其他运算符的状态相关?

Flink 将接受 class 作为有效的 POJO 类型,即使它包含一个在不回退到 Kryo 的情况下无法序列化的字段(例如 LIST 或 MAP)。在这种情况下,不会出现关于 Class <your class> cannot be used as a POJO type ... 的 INFO 日志消息,但是 class 将不会完全支持状态迁移。

Flink 可以处理 POJO 字段中的 LIST 和 MAP 类型,但不会自动处理(以避免破坏向后兼容性)。

您可以通过使用 @TypeInfo 注释您的 class 并为其实现一个 TypeInfoFactory<T> 为每个字段指定正确的 org.apache.flink.api.common.typeinfo.Types 来使它干净地工作,包括org.apache.flink.api.common.typeinfo.Types#MAP.

可能看起来像这样:

@TypeInfo(MyPojo.MyPojoTypeInfoFactory.class)
public class MyPojo {
  private String data;
  private HashMap<String, String> attributes;

  public static class MyPojoTypeInfoFactory extends TypeInfoFactory<MyPojo> {
    @Override
    public TypeInformation<MyPojo> createTypeInfo(
        Type t, Map<String, TypeInformation<?>> genericParameters) {
      Map<String, TypeInformation<?>> fields =
          new HashMap<String, TypeInformation<?>>() {
            {
              put("data", Types.STRING);
              put("attributes", Types.MAP(Types.STRING, Types.STRING));
            }
          };
      return Types.POJO(MyPojo.class, fields);
    }
  }
}

请注意,Types.MAP 字段不能为空。映射中不允许使用空键,但可以使用空值。

我一直无法理解 Flink 内部发生的事情, 但我找到了一种在 2 次升级中实现迁移的方法, 虽然我不太明白为什么会这样。

在第一次升级中,我没有向 POJO 添加任何新字段 class, 但我添加了大卫建议的类型信息。 在我的案例中,关键的是, 因为我最初的工作不能再修改而且 POJO 没有注释, TypeInfoFactoryMap 的类型信息必须指向 Kryo:

put("mapField", Types.GENERIC(Map.class));

然后我添加了一个新的状态描述符而不修改旧的(我的旧状态是根据接口定义的):

ListStateDescriptor<InterfaceWithGeneric<String>> legacyLSD = new ListStateDescriptor<>(
        "oldName",
        TypeInformation.of(new TypeHint<InterfaceWithGeneric<String>>() {})
);

ListStateDescriptor<MyPojo> newLSD = new ListStateDescriptor<>(
        "newName",
        MyPojo.class
);

这样我就可以从旧的描述符中读取并根据需要初始化新的描述符。

在第二次升级中,我可以删除旧的描述符并将新字段添加到 POJO, 同时更新 TypeInfoFactory。 现有地图的序列化将不得不继续使用 Kryo, 因为我找不到修改它的方法。

在一次升级中添加注释、新字段和新描述符对我不起作用。 我也不能重用旧的描述符; 第一次升级会很好, 但是在第二次升级中添加一个新字段再次抛出异常。 我不知道为什么有些异常引用了完全不相关的运算符, 但它似乎只是来自状态恢复后端的错误报告。