Flink 检查点不断失败

Flink checkpoints keeps failing

我们正在尝试使用 RocksDB 后端设置 Flink 有状态作业。 我们使用 session window,间隔 30 分钟。我们使用 aggregateFunction,因此不使用任何 Flink 状态变量。 通过抽样,我们只有不到 20k events/s、20 - 30 个新 sessions/s。我们的 session 基本上收集了所有事件。 session 累加器的大小会随着时间增加。 我们在 Flink 1.9 和 128 个容器中总共使用了 10G 内存。 以下是设置:

state.backend: rocksdb
state.checkpoints.dir: hdfs://nameservice0/myjob/path
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.4
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1

containerized.heap-cutoff-ratio: 0.45
taskmanager.network.memory.fraction: 0.5
taskmanager.network.memory.min: 512mb
taskmanager.network.memory.max: 2560mb

根据我们对给定时间的监控, rocksdb memtable 大小小于 10m, 我们的堆使用量不到 1G,但我们的直接内存使用量(网络缓冲区)使用了 2.5G。缓冲池/缓冲区使用指标都为 1(已满)。 我们的检查点不断失败, 请问网络buffer部分占用这么多内存正常吗?

如果您能提出一些建议,我将不胜感激:) 谢谢!

就其价值而言,会话 windows 确实在内部使用了 Flink 状态。 (大多数源和接收器也是如此。)根据您将会话事件收集到会话累加器中的方式,这可能是一个性能问题。如果您需要将所有事件收集在一起,为什么要使用 AggregateFunction 来执行此操作,而不是让 Flink 为您执行此操作?

为了获得最佳 windowing 性能,您希望使用递增 reduces/aggregates window 的 ReduceFunction 或 AggregateFunction,仅保留一小部分状态,最终window 的结果。另一方面,如果您只使用一个没有 pre-aggregation 的 ProcessWindowFunction,那么 Flink 将在内部使用一个附加列表状态对象,当与 RocksDB 一起使用时该对象非常有效——它只需要序列化每个事件以将其附加到列表的末尾。当 window 最终被触发时,列表将作为以块为单位反序列化的 Iterable 交付给您。另一方面,如果您使用 AggregateFunction 推出自己的解决方案,您可能会让 RocksDB 在每个 access/update 上反序列化和重新序列化累加器。这可能会变得非常昂贵,并且可以解释检查点失败的原因。

您分享的另一个有趣的事实是缓冲池/缓冲区使用指标显示它们已被充分利用。这表明背压很大,这反过来可以解释检查点失败的原因。检查点依赖于检查点障碍能够遍历整个执行图,在每个操作符运行时检查点,并在超时之前完成对作业的全面扫描。有背压,这可能会失败。

背压的最常见原因是 under-provisioning——或者换句话说,使集群不堪重负。网络缓冲池变得充分利用,因为运营商无法跟上。答案不是增加缓冲,而是 remove/fix 瓶颈。