Flink 模式演变不适用于 POJO class
Flink schema evolution not working for POJO class
我有一个 class 满足被视为 POJO 的要求,
这是我的流媒体工作中的主要传输方式 class
(它只包含基元和一个 Map<String, String>
)。
我添加了一个新的 String
字段和相应的 getter 和 setter,
但是如果我停止使用以前的 class 和保存点的作业并尝试使用它重新启动新的 class,
我得到一个例外:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3b2dbb810ac7d55949cb205a3075facc_(8/8) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend(StreamTaskStateInitializerImpl.java:291)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
...
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader[=11=](StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 12 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 23 common frames omitted
出于某种原因,它又回到了 Kryo。
我使用的是 Flink 1.9.3,according to the documentation应该支持这个改动。
根据 David 的回答,我正在尝试查看是否可以在 将新字段添加到 class 之前动态迁移我的状态 。
我已将 @TypeInfo
注释及其工厂添加到 MyPojo
,
我正在尝试像这样迁移状态:
lsd = new ListStateDescriptor<>(
"newName",
MyPojo.class
);
// migration
TypeToken<LabeledClassWithTimestamp<String>> typeToken = new TypeToken<LabeledClassWithTimestamp<String>>() {};
ListStateDescriptor<LabeledClassWithTimestamp<String>> legacyLSD = new ListStateDescriptor<>(
"oldName",
new KryoSerializer<>((Class<LabeledClassWithTimestamp<String>>) typeToken.getRawType(), runtimeContext.getExecutionConfig())
);
ListState<LabeledClassWithTimestamp<String>> legacyState = runtimeContext.getListState(legacyLSD);
try {
List<MyPojo> newState = new ArrayList<>();
legacyState.get().forEach(o -> newState.add((MyPojo) o));
if (!newState.isEmpty()) {
runtimeContext.getListState(lsd).update(newState);
legacyState.clear();
}
} catch (Exception e) {
LOG.error("Could not migrate state:", e);
}
但是,如果我使用新的 jar 从以前的保存点恢复作业,
Flink 在不同的运算符中抛出一个 StateMigrationException
:
2020-11-08T12:57:59.369Z INFO org.apache.flink.runtime.executiongraph.ExecutionGraph:1511 [flink-akka.actor.default-dispatcher-17] window-operator (1/8) (uid) switched from RUNNING to FAILED. org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
该运算符的状态只包含整数:
public class SlidingWindowProcessFunction extends ProcessWindowFunction<MyPojo, MyOutput, String, TimeWindow> {
private static final long serialVersionUID = 1L;
private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
"window-state", Integer.class);
...
@Override
public void process(String key, Context context, Iterable<MyPojo> iterable, Collector<MyOutput> collector) {
...
for (Integer hash : context.windowState().getListState(LSD).get()) {
alreadyProcessedHashes.add(hash);
}
...
}
}
即使 POJO class 未在其托管状态中直接使用,MyPojo
的序列化程序是否与此其他运算符的状态相关?
Flink 将接受 class 作为有效的 POJO 类型,即使它包含一个在不回退到 Kryo 的情况下无法序列化的字段(例如 LIST 或 MAP)。在这种情况下,不会出现关于 Class <your class> cannot be used as a POJO type ...
的 INFO 日志消息,但是 class 将不会完全支持状态迁移。
Flink 可以处理 POJO 字段中的 LIST 和 MAP 类型,但不会自动处理(以避免破坏向后兼容性)。
您可以通过使用 @TypeInfo
注释您的 class 并为其实现一个 TypeInfoFactory<T>
为每个字段指定正确的 org.apache.flink.api.common.typeinfo.Types
来使它干净地工作,包括org.apache.flink.api.common.typeinfo.Types#MAP
.
可能看起来像这样:
@TypeInfo(MyPojo.MyPojoTypeInfoFactory.class)
public class MyPojo {
private String data;
private HashMap<String, String> attributes;
public static class MyPojoTypeInfoFactory extends TypeInfoFactory<MyPojo> {
@Override
public TypeInformation<MyPojo> createTypeInfo(
Type t, Map<String, TypeInformation<?>> genericParameters) {
Map<String, TypeInformation<?>> fields =
new HashMap<String, TypeInformation<?>>() {
{
put("data", Types.STRING);
put("attributes", Types.MAP(Types.STRING, Types.STRING));
}
};
return Types.POJO(MyPojo.class, fields);
}
}
}
请注意,Types.MAP 字段不能为空。映射中不允许使用空键,但可以使用空值。
我一直无法理解 Flink 内部发生的事情,
但我找到了一种在 2 次升级中实现迁移的方法,
虽然我不太明白为什么会这样。
在第一次升级中,我没有向 POJO 添加任何新字段 class,
但我添加了大卫建议的类型信息。
在我的案例中,关键的是,
因为我最初的工作不能再修改而且 POJO 没有注释,
TypeInfoFactory
中 Map
的类型信息必须指向 Kryo:
put("mapField", Types.GENERIC(Map.class));
然后我添加了一个新的状态描述符而不修改旧的(我的旧状态是根据接口定义的):
ListStateDescriptor<InterfaceWithGeneric<String>> legacyLSD = new ListStateDescriptor<>(
"oldName",
TypeInformation.of(new TypeHint<InterfaceWithGeneric<String>>() {})
);
ListStateDescriptor<MyPojo> newLSD = new ListStateDescriptor<>(
"newName",
MyPojo.class
);
这样我就可以从旧的描述符中读取并根据需要初始化新的描述符。
在第二次升级中,我可以删除旧的描述符并将新字段添加到 POJO,
同时更新 TypeInfoFactory
。
现有地图的序列化将不得不继续使用 Kryo,
因为我找不到修改它的方法。
在一次升级中添加注释、新字段和新描述符对我不起作用。
我也不能重用旧的描述符;
第一次升级会很好,
但是在第二次升级中添加一个新字段再次抛出异常。
我不知道为什么有些异常引用了完全不相关的运算符,
但它似乎只是来自状态恢复后端的错误报告。
我有一个 class 满足被视为 POJO 的要求,
这是我的流媒体工作中的主要传输方式 class
(它只包含基元和一个 Map<String, String>
)。
我添加了一个新的 String
字段和相应的 getter 和 setter,
但是如果我停止使用以前的 class 和保存点的作业并尝试使用它重新启动新的 class,
我得到一个例外:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3b2dbb810ac7d55949cb205a3075facc_(8/8) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend(StreamTaskStateInitializerImpl.java:291)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
...
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader[=11=](StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 12 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 23 common frames omitted
出于某种原因,它又回到了 Kryo。
我使用的是 Flink 1.9.3,according to the documentation应该支持这个改动。
根据 David 的回答,我正在尝试查看是否可以在 将新字段添加到 class 之前动态迁移我的状态 。
我已将 @TypeInfo
注释及其工厂添加到 MyPojo
,
我正在尝试像这样迁移状态:
lsd = new ListStateDescriptor<>(
"newName",
MyPojo.class
);
// migration
TypeToken<LabeledClassWithTimestamp<String>> typeToken = new TypeToken<LabeledClassWithTimestamp<String>>() {};
ListStateDescriptor<LabeledClassWithTimestamp<String>> legacyLSD = new ListStateDescriptor<>(
"oldName",
new KryoSerializer<>((Class<LabeledClassWithTimestamp<String>>) typeToken.getRawType(), runtimeContext.getExecutionConfig())
);
ListState<LabeledClassWithTimestamp<String>> legacyState = runtimeContext.getListState(legacyLSD);
try {
List<MyPojo> newState = new ArrayList<>();
legacyState.get().forEach(o -> newState.add((MyPojo) o));
if (!newState.isEmpty()) {
runtimeContext.getListState(lsd).update(newState);
legacyState.clear();
}
} catch (Exception e) {
LOG.error("Could not migrate state:", e);
}
但是,如果我使用新的 jar 从以前的保存点恢复作业,
Flink 在不同的运算符中抛出一个 StateMigrationException
:
2020-11-08T12:57:59.369Z INFO org.apache.flink.runtime.executiongraph.ExecutionGraph:1511 [flink-akka.actor.default-dispatcher-17] window-operator (1/8) (uid) switched from RUNNING to FAILED. org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
该运算符的状态只包含整数:
public class SlidingWindowProcessFunction extends ProcessWindowFunction<MyPojo, MyOutput, String, TimeWindow> {
private static final long serialVersionUID = 1L;
private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
"window-state", Integer.class);
...
@Override
public void process(String key, Context context, Iterable<MyPojo> iterable, Collector<MyOutput> collector) {
...
for (Integer hash : context.windowState().getListState(LSD).get()) {
alreadyProcessedHashes.add(hash);
}
...
}
}
即使 POJO class 未在其托管状态中直接使用,MyPojo
的序列化程序是否与此其他运算符的状态相关?
Flink 将接受 class 作为有效的 POJO 类型,即使它包含一个在不回退到 Kryo 的情况下无法序列化的字段(例如 LIST 或 MAP)。在这种情况下,不会出现关于 Class <your class> cannot be used as a POJO type ...
的 INFO 日志消息,但是 class 将不会完全支持状态迁移。
Flink 可以处理 POJO 字段中的 LIST 和 MAP 类型,但不会自动处理(以避免破坏向后兼容性)。
您可以通过使用 @TypeInfo
注释您的 class 并为其实现一个 TypeInfoFactory<T>
为每个字段指定正确的 org.apache.flink.api.common.typeinfo.Types
来使它干净地工作,包括org.apache.flink.api.common.typeinfo.Types#MAP
.
可能看起来像这样:
@TypeInfo(MyPojo.MyPojoTypeInfoFactory.class)
public class MyPojo {
private String data;
private HashMap<String, String> attributes;
public static class MyPojoTypeInfoFactory extends TypeInfoFactory<MyPojo> {
@Override
public TypeInformation<MyPojo> createTypeInfo(
Type t, Map<String, TypeInformation<?>> genericParameters) {
Map<String, TypeInformation<?>> fields =
new HashMap<String, TypeInformation<?>>() {
{
put("data", Types.STRING);
put("attributes", Types.MAP(Types.STRING, Types.STRING));
}
};
return Types.POJO(MyPojo.class, fields);
}
}
}
请注意,Types.MAP 字段不能为空。映射中不允许使用空键,但可以使用空值。
我一直无法理解 Flink 内部发生的事情, 但我找到了一种在 2 次升级中实现迁移的方法, 虽然我不太明白为什么会这样。
在第一次升级中,我没有向 POJO 添加任何新字段 class,
但我添加了大卫建议的类型信息。
在我的案例中,关键的是,
因为我最初的工作不能再修改而且 POJO 没有注释,
TypeInfoFactory
中 Map
的类型信息必须指向 Kryo:
put("mapField", Types.GENERIC(Map.class));
然后我添加了一个新的状态描述符而不修改旧的(我的旧状态是根据接口定义的):
ListStateDescriptor<InterfaceWithGeneric<String>> legacyLSD = new ListStateDescriptor<>(
"oldName",
TypeInformation.of(new TypeHint<InterfaceWithGeneric<String>>() {})
);
ListStateDescriptor<MyPojo> newLSD = new ListStateDescriptor<>(
"newName",
MyPojo.class
);
这样我就可以从旧的描述符中读取并根据需要初始化新的描述符。
在第二次升级中,我可以删除旧的描述符并将新字段添加到 POJO,
同时更新 TypeInfoFactory
。
现有地图的序列化将不得不继续使用 Kryo,
因为我找不到修改它的方法。
在一次升级中添加注释、新字段和新描述符对我不起作用。 我也不能重用旧的描述符; 第一次升级会很好, 但是在第二次升级中添加一个新字段再次抛出异常。 我不知道为什么有些异常引用了完全不相关的运算符, 但它似乎只是来自状态恢复后端的错误报告。