无法将保存点从 1.2.1 恢复到 1.4
Can't restore savepoint from 1.2.1 to 1.4
我们已经部署了 1.4 版的新 Flink 实例。
在尝试从我们的旧 1.2.1 部署恢复保存点时,我们在尝试恢复的所有作业中遇到相同的错误:
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob.apply$mcV$sp(JobManager.scala:1360)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob.apply(JobManager.scala:1336)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob.apply(JobManager.scala:1336)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserializeSubtaskState(SavepointV1Serializer.java:171)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:96)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:54)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:278)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:70)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1141)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob.apply$mcV$sp(JobManager.scala:1350)
... 10 more
错误信息:
从 Flink 1.4 开始不再支持旧状态(来自 Flink <= 1.1,通过 'Checkpointed' 接口创建)。请改写您的作业以使用 'CheckpointedFunction'!
不过似乎是错误的,因为我们的其他部署是 运行 1.2.1.
1.4 的文档页面仍未更新:https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html 但似乎并行性在过去一直是个问题。我试过使用与保存点相同的作业,但仍然是同样的问题。
关于可能导致此问题的原因以及如何解决的任何提示?
谢谢!
在 1.4.0 版本中,Flink 不再支持从 Checkpointed
接口获取的状态恢复。为了进行状态升级,您必须执行以下操作:
- 在 Flink 1.2.1
上为您的作业 运行 创建一个保存点
- 在所有有状态函数中将
Checkpointed
替换为 CheckpointedFunction
- 实现
CheckpointedRestoring
接口以从 Checkpointed
保存点恢复
- 在Flink 1.2.1上执行修改后的作业,并取第二个保存点
- 从所有有状态函数中删除
CheckpointedRestoring
接口
- 运行 在 Flink 1.4.0 上使用第二个保存点修改作业
如果在迁移工作时还有其他问题,请告诉我。
所以,终于想通了这个问题。
我们在 Flink 1.1 中开始运行我们的任务,然后将它们的保存点迁移到 1.2.1。
好像 Flink 1.2.1 没有对保存点做任何升级,所以它们仍然是旧格式,Flink 1.4 不支持的格式。
解决办法是运行我们的task+Flink 1.3中的savepoint,在那里新建一个savepoint,以新的格式保存。这个终于兼容 Flink 1.4 :)
我们已经部署了 1.4 版的新 Flink 实例。 在尝试从我们的旧 1.2.1 部署恢复保存点时,我们在尝试恢复的所有作业中遇到相同的错误:
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob.apply$mcV$sp(JobManager.scala:1360)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob.apply(JobManager.scala:1336)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob.apply(JobManager.scala:1336)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserializeSubtaskState(SavepointV1Serializer.java:171)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:96)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:54)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:278)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:70)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1141)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob.apply$mcV$sp(JobManager.scala:1350)
... 10 more
错误信息:
从 Flink 1.4 开始不再支持旧状态(来自 Flink <= 1.1,通过 'Checkpointed' 接口创建)。请改写您的作业以使用 'CheckpointedFunction'!
不过似乎是错误的,因为我们的其他部署是 运行 1.2.1.
1.4 的文档页面仍未更新:https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html 但似乎并行性在过去一直是个问题。我试过使用与保存点相同的作业,但仍然是同样的问题。
关于可能导致此问题的原因以及如何解决的任何提示?
谢谢!
在 1.4.0 版本中,Flink 不再支持从 Checkpointed
接口获取的状态恢复。为了进行状态升级,您必须执行以下操作:
- 在 Flink 1.2.1 上为您的作业 运行 创建一个保存点
- 在所有有状态函数中将
Checkpointed
替换为CheckpointedFunction
- 实现
CheckpointedRestoring
接口以从Checkpointed
保存点恢复 - 在Flink 1.2.1上执行修改后的作业,并取第二个保存点
- 从所有有状态函数中删除
CheckpointedRestoring
接口 - 运行 在 Flink 1.4.0 上使用第二个保存点修改作业
如果在迁移工作时还有其他问题,请告诉我。
所以,终于想通了这个问题。
我们在 Flink 1.1 中开始运行我们的任务,然后将它们的保存点迁移到 1.2.1。
好像 Flink 1.2.1 没有对保存点做任何升级,所以它们仍然是旧格式,Flink 1.4 不支持的格式。
解决办法是运行我们的task+Flink 1.3中的savepoint,在那里新建一个savepoint,以新的格式保存。这个终于兼容 Flink 1.4 :)