有没有办法在执行流之前以编程方式检查 Flink 流作业是否从保存点开始?

Is there a way to programmatically check if a Flink streaming job started from a savepoint before executing the stream?

StreamExecutionEnvironment 上调用 execute 并启动流作业之前,有没有办法以编程方式查明作业是否从保存点恢复?我需要知道这些信息,以便在构建作业图时可以根据它设置 Kafka 源的偏移量。

似乎具有方法initializeStateFlinkConnectorKafkaBaseclass可以访问此类信息(code)。但是,无法拦截 FunctionInitializationContext 并检索 isRestored() 值,因为 initializeState 是一个 final 方法。此外,initializeState 方法在作业图执行后被调用,因此我认为没有与之相关的可行解决方案。

我做的另一个尝试是找到一个 Flink 作业参数,该参数指示作业是否从保存点启动。但是,我认为不存在这样的参数。

您只需执行以下操作即可获得您想要的效果:

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest(); 

如果使用setStartFromEarliest,那么Flink会忽略Kafka中存储的偏移量,而是从最早的记录开始读取。此外,即使您使用 setStartFromEarliest,如果 Flink 从检查点或保存点恢复,它也会使用存储在该快照中的偏移量。

请注意,Flink 进行自己的 Kafka 偏移量管理,并且在从检查点恢复时会忽略存储在 Kafka 中的偏移量。 Flink 将此作为提供精确一次保证的一部分,这需要确切知道有多少输入被消耗以产生在检查点或保存点中捕获的其余状态中存在的结果。因此,Flink 始终将偏移量存储为每个状态快照(检查点或保存点)的一部分。

这已记录在案 here and here

至于你最初关于 initializeState 的问题,如果你实现了 CheckpointedFunction 接口,这是可用的,但实际上很少需要这个。