Flink读取Kafka,某些情况下消耗速度急剧下降

Flink reads Kafka, consume speed drops dramatically in some case

我们有一个Flink作业(Flink版本:1.9),通过key连接两个kafka源,对每个key,启动一个5分钟的定时器,消息缓存在Flink状态,定时器结束时,合并消息使用相同的密钥(通常,每个密钥有 1~5 条消息)到一个胖密钥并将其发送到卡夫卡。

两个kafka来源:

  1. source1(160个分区,每分钟20~3000万条消息),
  2. source2(30个分区,每分钟1~3百万条消息)。

平面映射只是反序列化kafka消息。

KeyedProcess 是计时器和 Flink 状态发挥作用的地方。

我尝试了一些方法来提高性能,例如modulo key 减少定时器的数量,或者增加硬件(目前2000c 4000gb),或者调整operators的并行度。

目前的问题是,当source1超过每分钟2500万条消息时,消费速度会急剧下降,并且永远无法恢复。如果少于 2500 万 messages/minute.

kafka集群本身似乎没有问题,因为有另一个系统从它读取,并且那个系统没有任何消耗速度问题。

任何人都可以解释一下吗?如何解决原因?或者我可以尝试什么?添加更多硬件是个好主意吗(我认为 2000c 和 4000gb 已经是一个巨大的资源量)?非常感谢。

您可以先附加一个探查器,看看瓶颈在哪里。 (也许是磁盘?)

在某种程度上,RocksDB 不再表现良好似乎是合理的。 可能需要进行一些调整。您应该能够通过 enabling the RocksDB native metrics 获得一些洞察力,并查看问题发生时各种 RocksDB 指标如何变化。

这些是一些更有用的指标:

estimate-live-data-size
estimate-num-keys
num-running-compactions
num-live-versions
estimate-pending-compaction-bytes
num-running-flushes
size-all-mem-tables
block-cache-usage

根据此工作负载的位置和方式 运行,您可能遇到了某种速率限制或节流。请参阅 磁盘对 Flink 中 RocksDB 状态后端的影响:案例研究 举个有趣的例子。