在 java 中限制 kafka 消费者消息的正确方法

Correct way of throttling kafka consumer messages in java

我遇到这样一种情况,其中 1 个线程正在快速消耗来自 Kafka 主题的消息并将它们放入阻塞队列,然后在另一个线程中消耗该队列写入批量插入 mongo数据库集合。我没有看到很多答案,因为这是一个常见问题,我的应用程序崩溃是因为消息 q 变得太大并且内存不足,因为 mongo db writer thread 跟不上消息消耗率。

配置kafka消费者暂停消息消费一段时间直到消息q恢复到合理大小的正确方法是什么。我可以在泳池循环中暂停一下吗?我不这么认为,否则消费者将被标记为不在线,我可以在每次消息 q 变得太大时关闭 Kafka 消费者,然后在它回到可管理的大小时重新连接吗?我可以,但这似乎不是一个干净的解决方案,我正在寻找的是说“嘿卡夫卡,请暂停向我的活跃消费者发送消息,直到我告诉你恢复”,因为这可以让我以最快的速度提取消息我可以将它们插入到我的数据存储中。

请帮忙!

kafka中有pause和resume方法api https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(java.util.Collection)

如果您检查“消耗流量控制”部分,它说明如下:

Kafka supports dynamic controlling of consumption flows by using pause(Collection) and resume(Collection) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future poll(long) calls.