java如何从Kafka Consumer一条条获取消息?
How to get messages from Kafka Consumer one by one in java?
我正在使用 Apache Kafka API 并试图一次只获取一条消息。我只写一个主题。我可以通过带有文本框的弹出 UI 屏幕来发送和接收消息。我在文本框中输入一个字符串,然后单击“发送”。我可以发送任意数量的消息。假设我发送了 3 条消息,我的 3 条消息是“嗨”、“哈哈”、“再见”。还有一个“接收”按钮。现在,使用 TutorialsPoint 中的传统代码,当我单击接收按钮时,我会立即在控制台上打印所有 3 条消息(hi、lol、bye)。但是,我只想在单击 UI 上的“接收”时一次打印一条消息。例如,我第一次按下接收按钮时,它会打印“hi”,第二次会是“lol”,第三次会是“bye”。我是卡夫卡的新手,对如何做到这一点感到困惑。我尝试从代码中删除两个循环,所以它只有
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf(records.iterator().next().value());
如果我只有这两行代码,我第一次按下接收按钮时,它会打印“hi”,但我第二次按下它时,收到消息“attempt to heartbeat failed since group is rebalancing卡夫卡。”当我设置 max.poll.records = 1 时我也遇到了错误,因为我想要我的所有消息 最终 但是当接收按钮时只需要将其中一个记录到控制台被按下。下一次,将记录主题中未记录的下一条消息。
希望这是有道理的!
感谢任何帮助!
提前致谢!
编辑:
包含队列后的新代码,这样我们就可以在发送和接收消息之间交替,并在有新消息时更新队列:
if (payloadQueue.isEmpty()){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty()) {
log.info("No More Records to Add");
consumer.close();
}
else {
records.iterator().forEachRemaining(record -> {
log.info("RECORD: " + record);
payloadQueue.offer(record);
});
payload = payloadQueue.poll().value();
log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
}
}
else {
payload = payloadQueue.poll().value();
log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
}
Kafka使用batch get来提升性能,实在没必要设置max.poll.records=1.
通过一些解决方法可以轻松实现您想要的。
解决方案
你可以有一个Queue
来存储消息,每次按下接收按钮,你从队列中轮询一条消息,如果队列为空,你调用consumer.poll
来填充排队。
代码
private Queue<ConsumerRecord<String,String>> queue=new LinkedList<>();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
public void buttonPressed(){
if (queue.isEmpty()){
consumer.poll(100).iterator().forEachRemaining(record->queue.offer(record));
}else {
System.out.println(queue.poll());
}
}
您应该按照@haoyu 的建议添加一个队列,但手动提交消耗的偏移量。否则应用程序重置可能会导致数据丢失(因为消息已从主题中使用,尽管没有打印到 UI)。
建议阅读KafkaConsumer javadoc的“手动偏移量控制”和“在Kafka外部存储偏移量”部分。
我正在使用 Apache Kafka API 并试图一次只获取一条消息。我只写一个主题。我可以通过带有文本框的弹出 UI 屏幕来发送和接收消息。我在文本框中输入一个字符串,然后单击“发送”。我可以发送任意数量的消息。假设我发送了 3 条消息,我的 3 条消息是“嗨”、“哈哈”、“再见”。还有一个“接收”按钮。现在,使用 TutorialsPoint 中的传统代码,当我单击接收按钮时,我会立即在控制台上打印所有 3 条消息(hi、lol、bye)。但是,我只想在单击 UI 上的“接收”时一次打印一条消息。例如,我第一次按下接收按钮时,它会打印“hi”,第二次会是“lol”,第三次会是“bye”。我是卡夫卡的新手,对如何做到这一点感到困惑。我尝试从代码中删除两个循环,所以它只有
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf(records.iterator().next().value());
如果我只有这两行代码,我第一次按下接收按钮时,它会打印“hi”,但我第二次按下它时,收到消息“attempt to heartbeat failed since group is rebalancing卡夫卡。”当我设置 max.poll.records = 1 时我也遇到了错误,因为我想要我的所有消息 最终 但是当接收按钮时只需要将其中一个记录到控制台被按下。下一次,将记录主题中未记录的下一条消息。
希望这是有道理的! 感谢任何帮助! 提前致谢!
编辑: 包含队列后的新代码,这样我们就可以在发送和接收消息之间交替,并在有新消息时更新队列:
if (payloadQueue.isEmpty()){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty()) {
log.info("No More Records to Add");
consumer.close();
}
else {
records.iterator().forEachRemaining(record -> {
log.info("RECORD: " + record);
payloadQueue.offer(record);
});
payload = payloadQueue.poll().value();
log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
}
}
else {
payload = payloadQueue.poll().value();
log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
}
Kafka使用batch get来提升性能,实在没必要设置max.poll.records=1.
通过一些解决方法可以轻松实现您想要的。
解决方案
你可以有一个Queue
来存储消息,每次按下接收按钮,你从队列中轮询一条消息,如果队列为空,你调用consumer.poll
来填充排队。
代码
private Queue<ConsumerRecord<String,String>> queue=new LinkedList<>();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
public void buttonPressed(){
if (queue.isEmpty()){
consumer.poll(100).iterator().forEachRemaining(record->queue.offer(record));
}else {
System.out.println(queue.poll());
}
}
您应该按照@haoyu 的建议添加一个队列,但手动提交消耗的偏移量。否则应用程序重置可能会导致数据丢失(因为消息已从主题中使用,尽管没有打印到 UI)。
建议阅读KafkaConsumer javadoc的“手动偏移量控制”和“在Kafka外部存储偏移量”部分。