java如何从Kafka Consumer一条条获取消息?

How to get messages from Kafka Consumer one by one in java?

我正在使用 Apache Kafka API 并试图一次只获取一条消息。我只写一个主题。我可以通过带有文本框的弹出 UI 屏幕来发送和接收消息。我在文本框中输入一个字符串,然后单击“发送”。我可以发送任意数量的消息。假设我发送了 3 条消息,我的 3 条消息是“嗨”、“哈哈”、“再见”。还有一个“接收”按钮。现在,使用 TutorialsPoint 中的传统代码,当我单击接收按钮时,我会立即在控制台上打印所有 3 条消息(hi、lol、bye)。但是,我只想在单击 UI 上的“接收”时一次打印一条消息。例如,我第一次按下接收按钮时,它会打印“hi”,第二次会是“lol”,第三次会是“bye”。我是卡夫卡的新手,对如何做到这一点感到困惑。我尝试从代码中删除两个循环,所以它只有

ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf(records.iterator().next().value());

如果我只有这两行代码,我第一次按下接收按钮时,它会打印“hi”,但我第二次按下它时,收到消息“attempt to heartbeat failed since group is rebalancing卡夫卡。”当我设置 max.poll.records = 1 时我也遇到了错误,因为我想要我的所有消息 最终 但是当接收按钮时只需要将其中一个记录到控制台被按下。下一次,将记录主题中未记录的下一条消息。

希望这是有道理的! 感谢任何帮助! 提前致谢!

编辑: 包含队列后的新代码,这样我们就可以在发送和接收消息之间交替,并在有新消息时更新队列:

        if (payloadQueue.isEmpty()){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            if (records.isEmpty()) {
                log.info("No More Records to Add");
                consumer.close();
            }
            else {
                records.iterator().forEachRemaining(record -> {
                    log.info("RECORD: " + record);
                    payloadQueue.offer(record);
                });
                payload = payloadQueue.poll().value();
                log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
            }
        }
        else {
            payload = payloadQueue.poll().value();
            log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
        }

Kafka使用batch get来提升性能,实在没必要​​设置max.poll.records=1.
通过一些解决方法可以轻松实现您想要的。

解决方案

你可以有一个Queue来存储消息,每次按下接收按钮,你从队列中轮询一条消息,如果队列为空,你调用consumer.poll来填充排队。

代码

    private Queue<ConsumerRecord<String,String>> queue=new LinkedList<>();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    public void buttonPressed(){
        if (queue.isEmpty()){
            consumer.poll(100).iterator().forEachRemaining(record->queue.offer(record));
        }else {
            System.out.println(queue.poll());
        }
    }

您应该按照@haoyu 的建议添加一个队列,但手动提交消耗的偏移量。否则应用程序重置可能会导致数据丢失(因为消息已从主题中使用,尽管没有打印到 UI)。

建议阅读KafkaConsumer javadoc的“手动偏移量控制”和“在Kafka外部存储偏移量”部分。