Kafka - 有什么比 poll() 更好的替代方法来收听 Java 中的主题?
Kafka - What are the better alternatives than poll() to listen to a topic in Java?
我正在尝试在 Java 中创建消费者客户端。我意识到 poll() 函数已贬值。听 Kafka 主题的替代方案是什么?
我的代码:
KafkaConsumer< String, UserSegmentPayload > kc = new KafkaConsumer<>(props2);
kc.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, UserSegmentPayload> records = kc.poll(100);
for (ConsumerRecord<String, UserSegmentPayload> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
不推荐使用 poll()
和 poll(long)
的原因是它们可能会无限期阻塞(即使在指定超时的第二种情况下也是如此)。此行为的根本原因是这些方法中的初始元数据更新可能会永远阻塞(请参阅 here)。相反,您应该使用 KafkaConsumer
的 poll(Duration)
方法。因此,在您的代码中,您所要做的就是将 kc.poll(100)
替换为 kc.poll(Duration.ofMillis(100))
.
我正在尝试在 Java 中创建消费者客户端。我意识到 poll() 函数已贬值。听 Kafka 主题的替代方案是什么?
我的代码:
KafkaConsumer< String, UserSegmentPayload > kc = new KafkaConsumer<>(props2);
kc.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, UserSegmentPayload> records = kc.poll(100);
for (ConsumerRecord<String, UserSegmentPayload> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
不推荐使用 poll()
和 poll(long)
的原因是它们可能会无限期阻塞(即使在指定超时的第二种情况下也是如此)。此行为的根本原因是这些方法中的初始元数据更新可能会永远阻塞(请参阅 here)。相反,您应该使用 KafkaConsumer
的 poll(Duration)
方法。因此,在您的代码中,您所要做的就是将 kc.poll(100)
替换为 kc.poll(Duration.ofMillis(100))
.