卡夫卡轮询无记录的正确方法

Kafka Proper Way to Poll No Records

为了让我的消费者保持活跃(非常长的可变长度处理),我在后台线程中实现了一个空的 poll() 调用,如果我在 polls() 之间花费太多时间,它将阻止代理重新平衡。我已将我的轮询间隔设置得非常长,但我不想为了越来越长的处理而永远增加它。

轮询无记录的正确方法是什么?目前我正在调用 poll(),然后重新寻找 poll call() 中返回的每个分区的最早偏移量,以便主线程在处理完之前的消息后可以正确读取它们。

ConsumerRecords<String, String> msgs = kafkaConsumer.poll(timeout);
Map<Integer, Long> partitionToOffsets = getEarliestPartitionOffsets(msgs); // helper method
seekToOffsets(partitionToOffsets);

处理长处理时间(并避免消费者重新平衡)的正确方法是使用 KafkaConsumer.pause() / KafkaConsumer.resume() 方法。您可以在这里阅读更多相关信息: