Flink读取Kafka,某些情况下消耗速度急剧下降
Flink reads Kafka, consume speed drops dramatically in some case
我们有一个Flink作业(Flink版本:1.9),通过key连接两个kafka源,对每个key,启动一个5分钟的定时器,消息缓存在Flink状态,定时器结束时,合并消息使用相同的密钥(通常,每个密钥有 1~5 条消息)到一个胖密钥并将其发送到卡夫卡。
两个kafka来源:
- source1(160个分区,每分钟20~3000万条消息),
- source2(30个分区,每分钟1~3百万条消息)。
平面映射只是反序列化kafka消息。
KeyedProcess 是计时器和 Flink 状态发挥作用的地方。
我尝试了一些方法来提高性能,例如modulo key 减少定时器的数量,或者增加硬件(目前2000c 4000gb),或者调整operators的并行度。
目前的问题是,当source1超过每分钟2500万条消息时,消费速度会急剧下降,并且永远无法恢复。如果少于 2500 万 messages/minute.
kafka集群本身似乎没有问题,因为有另一个系统从它读取,并且那个系统没有任何消耗速度问题。
任何人都可以解释一下吗?如何解决原因?或者我可以尝试什么?添加更多硬件是个好主意吗(我认为 2000c 和 4000gb 已经是一个巨大的资源量)?非常感谢。
您可以先附加一个探查器,看看瓶颈在哪里。 (也许是磁盘?)
在某种程度上,RocksDB 不再表现良好似乎是合理的。
可能需要进行一些调整。您应该能够通过 enabling the RocksDB native metrics 获得一些洞察力,并查看问题发生时各种 RocksDB 指标如何变化。
这些是一些更有用的指标:
estimate-live-data-size
estimate-num-keys
num-running-compactions
num-live-versions
estimate-pending-compaction-bytes
num-running-flushes
size-all-mem-tables
block-cache-usage
根据此工作负载的位置和方式 运行,您可能遇到了某种速率限制或节流。请参阅 磁盘对 Flink 中 RocksDB 状态后端的影响:案例研究
举个有趣的例子。
我们有一个Flink作业(Flink版本:1.9),通过key连接两个kafka源,对每个key,启动一个5分钟的定时器,消息缓存在Flink状态,定时器结束时,合并消息使用相同的密钥(通常,每个密钥有 1~5 条消息)到一个胖密钥并将其发送到卡夫卡。
两个kafka来源:
- source1(160个分区,每分钟20~3000万条消息),
- source2(30个分区,每分钟1~3百万条消息)。
平面映射只是反序列化kafka消息。
KeyedProcess 是计时器和 Flink 状态发挥作用的地方。
我尝试了一些方法来提高性能,例如modulo key 减少定时器的数量,或者增加硬件(目前2000c 4000gb),或者调整operators的并行度。
目前的问题是,当source1超过每分钟2500万条消息时,消费速度会急剧下降,并且永远无法恢复。如果少于 2500 万 messages/minute.
kafka集群本身似乎没有问题,因为有另一个系统从它读取,并且那个系统没有任何消耗速度问题。
任何人都可以解释一下吗?如何解决原因?或者我可以尝试什么?添加更多硬件是个好主意吗(我认为 2000c 和 4000gb 已经是一个巨大的资源量)?非常感谢。
您可以先附加一个探查器,看看瓶颈在哪里。 (也许是磁盘?)
在某种程度上,RocksDB 不再表现良好似乎是合理的。 可能需要进行一些调整。您应该能够通过 enabling the RocksDB native metrics 获得一些洞察力,并查看问题发生时各种 RocksDB 指标如何变化。
这些是一些更有用的指标:
estimate-live-data-size
estimate-num-keys
num-running-compactions
num-live-versions
estimate-pending-compaction-bytes
num-running-flushes
size-all-mem-tables
block-cache-usage
根据此工作负载的位置和方式 运行,您可能遇到了某种速率限制或节流。请参阅 磁盘对 Flink 中 RocksDB 状态后端的影响:案例研究 举个有趣的例子。