Flink 1.12.2 在本地修改保存点 - 元数据具有绝对路径
Flink 1.12.2 modifying savepoint locally - metadata has absolute paths
我正在尝试修改一个现有的保存点,该保存点是使用 flink 1.12.2 和 ververica 2.4.1 创建的,保存在 S3 上。
我采取的步骤如下:
- 已将包含“_metadata”的保存点和保存点文件从 S3 复制到我的本地计算机;
- 打开flink状态,读取我感兴趣的operator的状态;
- 创建并修改了我想用 ;
替换该运算符状态的数据集
- 正在尝试用以下代码修改状态
BootstrapTransformation<AccountRegistrationInformation> transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
.keyBy(acc -> acc.getBrand() + "-" + acc.getAccountId())
.transform(new AccountRegistrationBootstrapper());
Savepoint.load(executionEnvironment, "C:\flinkState", new MemoryStateBackend())
.removeOperator("registration-processor")
.withOperator("registration-processor", transformation)
.write("C:\flinkState\transformed");
executionEnvironment.execute();
当运行上述代码时,它修改了数据集的一个子集并且flink抛出以下异常。
Caused by: java.io.FileNotFoundException: \<redacted>\savepoint-c680a3-c178150a8b8dc44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
检查 _metadata 时,我注意到它在 S3 中有绝对路径:
s3://<redacted>/savepoint-c680a3-c178150a8b8d/32c44059-1f59-4091-bcb5-3e1efa369ec6
我想要的是将修改后的保存点保存到我的本地机器,然后将该保存点手动移动到 S3,以便 flink 可以以修改后的状态启动。
有人可以分享他们的经验吗?
完全异常:
10:09:25,169 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory.
10:09:25,170 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
10:09:25,171 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory.
10:09:25,176 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (1 channels)
10:09:25,178 ERROR org.apache.flink.runtime.operators.DataSinkTask [] - Error in user code: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8dc44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified): DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)
java.io.FileNotFoundException: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8dc44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_282]
at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_282]
at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_282]
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
10:09:25,223 WARN org.apache.flink.runtime.taskmanager.Task [] - DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)#0 (d4b998c90a0fc21a64f463b6476e85aa) switched from RUNNING to FAILED.
java.io.FileNotFoundException: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8dc44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_282]
at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_282]
at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_282]
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) ~[flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-runtime_2.11-1.12.2.jar:1.12.2]
10:09:25,224 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)#0 (d4b998c90a0fc21a64f463b6476e85aa).
10:09:25,255 INFO org.apache.flink.runtime.taskmanager.Task [] - MapPartition (2861c3d1e95af557df2962264aaf94ef) (6/8)#0 (ab4fcd08aa51c77eec1ac6d3c9fba2d3) switched from RUNNING to FINISHED.
10:09:25,255 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for MapPartition (2861c3d1e95af557df2962264aaf94ef) (6/8)#0 (ab4fcd08aa51c77eec1ac6d3c9fba2d3).
10:09:25,255 INFO org.apache.flink.runtime.taskmanager.Task [] - MapPartition (2861c3d1e95af557df2962264aaf94ef) (8/8)#0 (c0105262e4e271633df686c1b09476a9) switched from RUNNING to FINISHED.
10:09:25,256 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for MapPartition (2861c3d1e95af557df2962264aaf94ef) (8/8)#0 (c0105262e4e271633df686c1b09476a9).
_metadata 中的绝对路径可以是指向内联状态的指针:即直接存储在_metadata 中的状态。存储在数据文件中的状态应该有相对路径。
您在 'C:\flinkState' 代码中使用什么,您在 FileNotFoundException 中看到什么“”?如果它们是敏感的,你能举个例子吗?
此外,您是否在 Linux 机器上试过?
更新:
添加的堆栈跟踪类似于https://issues.apache.org/jira/browse/FLINK-23429中的堆栈跟踪。您能否尝试在保存点转换作业中添加来自 Flink 1.12.5 的状态处理器 API 依赖项?
我正在尝试修改一个现有的保存点,该保存点是使用 flink 1.12.2 和 ververica 2.4.1 创建的,保存在 S3 上。
我采取的步骤如下:
- 已将包含“_metadata”的保存点和保存点文件从 S3 复制到我的本地计算机;
- 打开flink状态,读取我感兴趣的operator的状态;
- 创建并修改了我想用 ; 替换该运算符状态的数据集
- 正在尝试用以下代码修改状态
BootstrapTransformation<AccountRegistrationInformation> transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
.keyBy(acc -> acc.getBrand() + "-" + acc.getAccountId())
.transform(new AccountRegistrationBootstrapper());
Savepoint.load(executionEnvironment, "C:\flinkState", new MemoryStateBackend())
.removeOperator("registration-processor")
.withOperator("registration-processor", transformation)
.write("C:\flinkState\transformed");
executionEnvironment.execute();
当运行上述代码时,它修改了数据集的一个子集并且flink抛出以下异常。
Caused by: java.io.FileNotFoundException: \<redacted>\savepoint-c680a3-c178150a8b8dc44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
检查 _metadata 时,我注意到它在 S3 中有绝对路径:
s3://<redacted>/savepoint-c680a3-c178150a8b8d/32c44059-1f59-4091-bcb5-3e1efa369ec6
我想要的是将修改后的保存点保存到我的本地机器,然后将该保存点手动移动到 S3,以便 flink 可以以修改后的状态启动。
有人可以分享他们的经验吗?
完全异常:
10:09:25,169 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory.
10:09:25,170 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
10:09:25,171 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory.
10:09:25,176 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (1 channels)
10:09:25,178 ERROR org.apache.flink.runtime.operators.DataSinkTask [] - Error in user code: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8dc44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified): DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)
java.io.FileNotFoundException: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8dc44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_282]
at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_282]
at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_282]
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
10:09:25,223 WARN org.apache.flink.runtime.taskmanager.Task [] - DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)#0 (d4b998c90a0fc21a64f463b6476e85aa) switched from RUNNING to FAILED.
java.io.FileNotFoundException: \<redacted>\savepoints\d18b311a-86e8-4406-93b5-f2b398c4257f\savepoint-c680a3-c178150a8b8dc44059-1f59-4091-bcb5-3e1efa369ec6 (The system cannot find the path specified)
at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_282]
at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_282]
at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_282]
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-core-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34) ~[flink-state-processor-api_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) ~[flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-runtime_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-runtime_2.11-1.12.2.jar:1.12.2]
10:09:25,224 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for DataSink (org.apache.flink.state.api.output.FileCopyFunction@da28d03) (1/1)#0 (d4b998c90a0fc21a64f463b6476e85aa).
10:09:25,255 INFO org.apache.flink.runtime.taskmanager.Task [] - MapPartition (2861c3d1e95af557df2962264aaf94ef) (6/8)#0 (ab4fcd08aa51c77eec1ac6d3c9fba2d3) switched from RUNNING to FINISHED.
10:09:25,255 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for MapPartition (2861c3d1e95af557df2962264aaf94ef) (6/8)#0 (ab4fcd08aa51c77eec1ac6d3c9fba2d3).
10:09:25,255 INFO org.apache.flink.runtime.taskmanager.Task [] - MapPartition (2861c3d1e95af557df2962264aaf94ef) (8/8)#0 (c0105262e4e271633df686c1b09476a9) switched from RUNNING to FINISHED.
10:09:25,256 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for MapPartition (2861c3d1e95af557df2962264aaf94ef) (8/8)#0 (c0105262e4e271633df686c1b09476a9).
_metadata 中的绝对路径可以是指向内联状态的指针:即直接存储在_metadata 中的状态。存储在数据文件中的状态应该有相对路径。
您在 'C:\flinkState' 代码中使用什么,您在 FileNotFoundException 中看到什么“
此外,您是否在 Linux 机器上试过?
更新:
添加的堆栈跟踪类似于https://issues.apache.org/jira/browse/FLINK-23429中的堆栈跟踪。您能否尝试在保存点转换作业中添加来自 Flink 1.12.5 的状态处理器 API 依赖项?