使用状态处理器 api 检查 flink 保存点时出现异常
Getting Exception while inspecting flink savepoint using state processor api
我在线程 "main" java.lang.IllegalAccessError 中遇到异常:class org.apache.flink.state.api.runtime.SavepointLoader 试图访问受保护的方法 org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(Ljava/lang/String; )Lorg/apache/flink/runtime/state/CompletedCheckpointStorageLocation; (org.apache.flink.state.api.runtime.SavepointLoader 和 org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage 在加载程序 'app' 的未命名模块中)
使用 flink 1.8。
使用下面的 Maven 仓库:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.12</artifactId>
<version>1.9.1</version>
</dependency>
源代码片段
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "/home/utlesh/Documents/savepoint", new MemoryStateBackend()) ;
savepoint.readListState("input-events-source-01", "Custom Source", TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>(){}));
在调用以下函数的第二行出现异常
public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
...
...
}
调用以下函数:
package org.apache.flink.state.api.runtime;
public static Savepoint loadSavepoint(String savepointPath) throws IOException {
CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage
.resolveCheckpointPointer(savepointPath);
try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader());
}
}
调用以下函数:
package org.apache.flink.runtime.state.filesystem;
protected static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
checkNotNull(checkpointPointer, "checkpointPointer");
checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
...
...
}
仔细一看,这里调用了不同包的protected函数。这是 flink maven repo 中的错误还是我使用错误的方式?
还有其他方法可以反序列化或读取 flink 保存点和检查点吗?
您的 flink 似乎依赖版本不匹配。
将以下依赖项添加到 pom.xml 并重新构建,同时从同一文件中删除 flink-clients 的旧版本依赖项。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.1</version>
</dependency>
状态处理器API只能用于批处理作业运行Flink 1.9或更高版本,但它可用于读取流式作业写入的保存点和检查点运行 旧版本的 Flink(回到 Flink 1.6)。
我在线程 "main" java.lang.IllegalAccessError 中遇到异常:class org.apache.flink.state.api.runtime.SavepointLoader 试图访问受保护的方法 org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(Ljava/lang/String; )Lorg/apache/flink/runtime/state/CompletedCheckpointStorageLocation; (org.apache.flink.state.api.runtime.SavepointLoader 和 org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage 在加载程序 'app' 的未命名模块中)
使用 flink 1.8。 使用下面的 Maven 仓库:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.12</artifactId>
<version>1.9.1</version>
</dependency>
源代码片段
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "/home/utlesh/Documents/savepoint", new MemoryStateBackend()) ;
savepoint.readListState("input-events-source-01", "Custom Source", TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>(){}));
在调用以下函数的第二行出现异常
public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
...
...
}
调用以下函数:
package org.apache.flink.state.api.runtime;
public static Savepoint loadSavepoint(String savepointPath) throws IOException {
CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage
.resolveCheckpointPointer(savepointPath);
try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader());
}
}
调用以下函数:
package org.apache.flink.runtime.state.filesystem;
protected static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
checkNotNull(checkpointPointer, "checkpointPointer");
checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
...
...
}
仔细一看,这里调用了不同包的protected函数。这是 flink maven repo 中的错误还是我使用错误的方式? 还有其他方法可以反序列化或读取 flink 保存点和检查点吗?
您的 flink 似乎依赖版本不匹配。
将以下依赖项添加到 pom.xml 并重新构建,同时从同一文件中删除 flink-clients 的旧版本依赖项。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.1</version>
</dependency>
状态处理器API只能用于批处理作业运行Flink 1.9或更高版本,但它可用于读取流式作业写入的保存点和检查点运行 旧版本的 Flink(回到 Flink 1.6)。