Flink 状态后端配置与状态处理器 api
Flink state backend config with the state processor api
我使用状态处理器-api,因为它已发布到 bootstrap 我的 flink 状态。我使用 RocksDBStateBackend 并且它有效。
我们最近去了 flink 1.13,不推荐使用 RocksDBStateBackend,取而代之的是 EmbeddedRocksDBStateBackend。
我的问题:
自从 API 的变化和我开发的新 bootstrap 工作后,我得到以下异常:
Caused by: java.io.IOException: Size of the state is larger than the
maximum permitted memory-backed state. Size=85356498 , maxSize=5242880
. Consider using a different state backend, like the File System State
backend.
在这里我声明我的状态后端:
val backend = new EmbeddedRocksDBStateBackend(true)
在这里我创建了我的保存点:
Savepoint
.create(backend, MAX_PARALLELISM)
.withOperator("my_operator", transformMyOperator)
.write(savepointPath)
此外,我的 flink 集群配置为使用 RocksDB 状态后端,
和所有其他 flink 拓扑使用 RocksDB 后端。
所以我想知道为什么我得到一个异常,说我不应该使用内存状态后端,因为我使用 RocksDB。欢迎任何帮助。
在 Flink 1.13 中,状态后端的选择与检查点存储提供程序的选择分离。
我猜你之前依赖于 RocksDBStateBackend 构造函数来指定你希望存储检查点的位置。现在您应该在 flink-conf.yaml
中配置它
state.checkpoints.dir: file:///checkpoint-dir/
或在您的代码中
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
有关详细信息,请参阅 Migrating from Legacy Backends。
是1.13的BUG,请看FLINK-23728,运行1.14.0-RC0帮我解决了问题
我使用状态处理器-api,因为它已发布到 bootstrap 我的 flink 状态。我使用 RocksDBStateBackend 并且它有效。 我们最近去了 flink 1.13,不推荐使用 RocksDBStateBackend,取而代之的是 EmbeddedRocksDBStateBackend。
我的问题:
自从 API 的变化和我开发的新 bootstrap 工作后,我得到以下异常:
Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=85356498 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
在这里我声明我的状态后端:
val backend = new EmbeddedRocksDBStateBackend(true)
在这里我创建了我的保存点:
Savepoint
.create(backend, MAX_PARALLELISM)
.withOperator("my_operator", transformMyOperator)
.write(savepointPath)
此外,我的 flink 集群配置为使用 RocksDB 状态后端, 和所有其他 flink 拓扑使用 RocksDB 后端。
所以我想知道为什么我得到一个异常,说我不应该使用内存状态后端,因为我使用 RocksDB。欢迎任何帮助。
在 Flink 1.13 中,状态后端的选择与检查点存储提供程序的选择分离。
我猜你之前依赖于 RocksDBStateBackend 构造函数来指定你希望存储检查点的位置。现在您应该在 flink-conf.yaml
state.checkpoints.dir: file:///checkpoint-dir/
或在您的代码中
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
有关详细信息,请参阅 Migrating from Legacy Backends。
是1.13的BUG,请看FLINK-23728,运行1.14.0-RC0帮我解决了问题