如何在 flink 上使用 Ratelimiter?

How to use Ratelimiter on flink?

我想在我的 Flink 作业中限制 Kafka 消费者。

查看 Flink 1.12 的源代码,我发现 FlinkConnectorRateLimiterGuavaFlinkConnectorRateLimiter。但是我找不到任何将此速率限制器连接到 FlinkKafkaConsumer.

的东西

如何在Flink 1.12中对Kafka进行限速?

FlinkConnectorRateLimiter 可用于遗留的 Kafka 消费者 (flink-connector-kafka-0.10),它在 Flink 1.12 中被删除。当前的kafka consumer不提供限速。

查看此邮件列表线程 -- https://lists.apache.org/thread/j7kw131jn0ozmrj763j0hr87b1rj7jop -- 进行一些讨论。简而言之,一旦对背压和事件时间偏差下的检查点的正在进行的改进完成,速率限制就不再有任何吸引力,因此实际上没有任何兴趣增加对速率限制的支持。

但是,上面的邮件列表线程确实包含一个示例,展示了如何通过扩展 FlinkKafkaConsumer 覆盖 emitRecordemitRecordWithTimestamp.[=15= 来自己实现 Kafka 的速率限制]


请注意不要阻塞检查点,这意味着您应该避免在主处理线程中休眠。反序列化模式 运行 在另一个线程中,因此这是进行速率限制的最佳位置。