Flink 状态后端配置与状态处理器 api

Flink state backend config with the state processor api

我使用状态处理器-api,因为它已发布到 bootstrap 我的 flink 状态。我使用 RocksDBStateBackend 并且它有效。 我们最近去了 flink 1.13,不推荐使用 RocksDBStateBackend,取而代之的是 EmbeddedRocksDBStateBackend。

我的问题:

自从 API 的变化和我开发的新 bootstrap 工作后,我得到以下异常:

Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=85356498 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.

在这里我声明我的状态后端:

val backend = new EmbeddedRocksDBStateBackend(true)

在这里我创建了我的保存点:

  Savepoint
    .create(backend, MAX_PARALLELISM)
    .withOperator("my_operator", transformMyOperator)
    .write(savepointPath)

此外,我的 flink 集群配置为使用 RocksDB 状态后端, 和所有其他 flink 拓扑使用 RocksDB 后端。

所以我想知道为什么我得到一个异常,说我不应该使用内存状态后端,因为我使用 RocksDB。欢迎任何帮助。

在 Flink 1.13 中,状态后端的选择与检查点存储提供程序的选择分离。

我猜你之前依赖于 RocksDBStateBackend 构造函数来指定你希望存储检查点的位置。现在您应该在 flink-conf.yaml

中配置它
state.checkpoints.dir: file:///checkpoint-dir/

或在您的代码中

env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");

有关详细信息,请参阅 Migrating from Legacy Backends

是1.13的BUG,请看FLINK-23728,运行1.14.0-RC0帮我解决了问题