Flink 增量检查点 - 数据在共享文件夹中保存多长时间
Flink incremental checkpointing - how long does data is kept in the share folder
我们正在使用 Flink 1.6.3 并将检查点保留在 CEPH 中,一次只保留一个检查点,使用增量并使用 rocksdb。
我们 运行 windows 迟了 3 天,这意味着我们预计检查点共享文件夹中的数据将在 3-4 天后保留,但我们仍然看到有来自更多的数据
例如
如果今天是 7 月 4 日,则有 2 月 4 日的一些文件
有时我们看到我们假设的检查点(由于其索引号不同步)它属于一个崩溃的作业并且检查点未用于恢复作业
我的问题是
- 为什么我们从延迟配置中看到较旧的数据
- 我怎么知道这些文件属于一个有效的检查点而不是一个破碎作业的检查点 - 所以我们可以删除这些文件
经过调查并在唐云(apache-flink-user-mailing-list)的协助下
我创建了以下代码
metadataPath - checkpoint/savepoint 文件夹中的 _metadata 文件的路径
这是在 flink 版本 1.6.3
上测试的
DataInputStream in = new DataInputStream(new FileInputStream(metadataPath));
final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, CheckpointTool.class.getClassLoader());
final Set<String> pathSharedFromMetadata = savepoint.getOperatorStates().stream()
.flatMap(operatorState -> operatorState.getSubtaskStates().values().stream()
.flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()
.flatMap(keyedStateHandle -> Stream.concat(((IncrementalKeyedStateHandle) keyedStateHandle).getSharedState().values().stream(),
((IncrementalKeyedStateHandle) keyedStateHandle).getPrivateState().values().stream())
.map(streamStateHandle -> {
String name = null;
if (streamStateHandle instanceof FileStateHandle) {
name = ((FileStateHandle) streamStateHandle).getFilePath().getName();
} else {
final String handleName = ((ByteStreamStateHandle) streamStateHandle).getHandleName();
name = new File(handleName).getName();
}
return name.trim();
})
)
)
)
.collect(Collectors.toSet());
System.out.println("pathSharedFromMetadata:" + pathSharedFromMetadata);
我们正在使用 Flink 1.6.3 并将检查点保留在 CEPH 中,一次只保留一个检查点,使用增量并使用 rocksdb。
我们 运行 windows 迟了 3 天,这意味着我们预计检查点共享文件夹中的数据将在 3-4 天后保留,但我们仍然看到有来自更多的数据
例如
如果今天是 7 月 4 日,则有 2 月 4 日的一些文件
有时我们看到我们假设的检查点(由于其索引号不同步)它属于一个崩溃的作业并且检查点未用于恢复作业
我的问题是
- 为什么我们从延迟配置中看到较旧的数据
- 我怎么知道这些文件属于一个有效的检查点而不是一个破碎作业的检查点 - 所以我们可以删除这些文件
经过调查并在唐云(apache-flink-user-mailing-list)的协助下
我创建了以下代码
metadataPath - checkpoint/savepoint 文件夹中的 _metadata 文件的路径
这是在 flink 版本 1.6.3
DataInputStream in = new DataInputStream(new FileInputStream(metadataPath));
final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, CheckpointTool.class.getClassLoader());
final Set<String> pathSharedFromMetadata = savepoint.getOperatorStates().stream()
.flatMap(operatorState -> operatorState.getSubtaskStates().values().stream()
.flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()
.flatMap(keyedStateHandle -> Stream.concat(((IncrementalKeyedStateHandle) keyedStateHandle).getSharedState().values().stream(),
((IncrementalKeyedStateHandle) keyedStateHandle).getPrivateState().values().stream())
.map(streamStateHandle -> {
String name = null;
if (streamStateHandle instanceof FileStateHandle) {
name = ((FileStateHandle) streamStateHandle).getFilePath().getName();
} else {
final String handleName = ((ByteStreamStateHandle) streamStateHandle).getHandleName();
name = new File(handleName).getName();
}
return name.trim();
})
)
)
)
.collect(Collectors.toSet());
System.out.println("pathSharedFromMetadata:" + pathSharedFromMetadata);