Kafka 消费者进入无休止的循环
Kafka consumer goes into an unending loop
我正在使用 Kafka 队列来保存一些对象,这些对象将由消费者应用程序检索并对其执行一些操作。
问题:如果消费者的处理时间超过 ~2 小时,kafka 似乎一次又一次地返回同一个对象
代码:
private static Queue queue = new LinkedList();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
System.out.println("\n\n\n Kafka has :[" + record.offset());
queue.add(record.value());
}
System.out.println("\n\n\n Kafka has :[" + records.count());
if (queue != null) {
maintainQueue();
}
}
我正在使用 kafka 版本 0.10.1.0
我们已经解决了这个问题
通过更新
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false"
并在 consumer.poll(Long.MAX_VALUE);
之后添加 consumer.commitSync();
参考:
Kafka - How to commit offset after every message using High-Level consumer?
http://www.slideshare.net/jjkoshy/offset-management-in-kafka
我正在使用 Kafka 队列来保存一些对象,这些对象将由消费者应用程序检索并对其执行一些操作。
问题:如果消费者的处理时间超过 ~2 小时,kafka 似乎一次又一次地返回同一个对象
代码:
private static Queue queue = new LinkedList();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
System.out.println("\n\n\n Kafka has :[" + record.offset());
queue.add(record.value());
}
System.out.println("\n\n\n Kafka has :[" + records.count());
if (queue != null) {
maintainQueue();
}
}
我正在使用 kafka 版本 0.10.1.0
我们已经解决了这个问题
通过更新
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false"
并在 consumer.poll(Long.MAX_VALUE);
consumer.commitSync();
参考:
Kafka - How to commit offset after every message using High-Level consumer?
http://www.slideshare.net/jjkoshy/offset-management-in-kafka