如何在 flink 上使用 Ratelimiter?
How to use Ratelimiter on flink?
我想在我的 Flink 作业中限制 Kafka 消费者。
查看 Flink 1.12 的源代码,我发现 FlinkConnectorRateLimiter
和 GuavaFlinkConnectorRateLimiter
。但是我找不到任何将此速率限制器连接到 FlinkKafkaConsumer
.
的东西
如何在Flink 1.12中对Kafka进行限速?
FlinkConnectorRateLimiter
可用于遗留的 Kafka 消费者 (flink-connector-kafka-0.10),它在 Flink 1.12 中被删除。当前的kafka consumer不提供限速。
查看此邮件列表线程 -- https://lists.apache.org/thread/j7kw131jn0ozmrj763j0hr87b1rj7jop -- 进行一些讨论。简而言之,一旦对背压和事件时间偏差下的检查点的正在进行的改进完成,速率限制就不再有任何吸引力,因此实际上没有任何兴趣增加对速率限制的支持。
但是,上面的邮件列表线程确实包含一个示例,展示了如何通过扩展 FlinkKafkaConsumer
覆盖 emitRecord
和 emitRecordWithTimestamp
.[=15= 来自己实现 Kafka 的速率限制]
请注意不要阻塞检查点,这意味着您应该避免在主处理线程中休眠。反序列化模式 运行 在另一个线程中,因此这是进行速率限制的最佳位置。
我想在我的 Flink 作业中限制 Kafka 消费者。
查看 Flink 1.12 的源代码,我发现 FlinkConnectorRateLimiter
和 GuavaFlinkConnectorRateLimiter
。但是我找不到任何将此速率限制器连接到 FlinkKafkaConsumer
.
如何在Flink 1.12中对Kafka进行限速?
FlinkConnectorRateLimiter
可用于遗留的 Kafka 消费者 (flink-connector-kafka-0.10),它在 Flink 1.12 中被删除。当前的kafka consumer不提供限速。
查看此邮件列表线程 -- https://lists.apache.org/thread/j7kw131jn0ozmrj763j0hr87b1rj7jop -- 进行一些讨论。简而言之,一旦对背压和事件时间偏差下的检查点的正在进行的改进完成,速率限制就不再有任何吸引力,因此实际上没有任何兴趣增加对速率限制的支持。
但是,上面的邮件列表线程确实包含一个示例,展示了如何通过扩展 FlinkKafkaConsumer
覆盖 emitRecord
和 emitRecordWithTimestamp
.[=15= 来自己实现 Kafka 的速率限制]
请注意不要阻塞检查点,这意味着您应该避免在主处理线程中休眠。反序列化模式 运行 在另一个线程中,因此这是进行速率限制的最佳位置。