检查 Kafka Consumer 是否没有任何记录 return 并且在 java 中为空的好方法?
Good way to check if Kafka Consumer doesn't have any records to return and is empty in java?
我正在使用 Apache KafkaConsumer。我想在不进行轮询的情况下检查消费者是否有任何消息发送给 return。如果我轮询消费者并且没有任何消息,那么即使我有 records.isEmpty()
子句,我也会在无限循环中收到消息“尝试心跳失败,因为组正在重新平衡”,直到超时到期。这是我的代码片段:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
if (records.isEmpty()) {
log.info("No More Records");
consumer.close();
}
else {
records.iterator().forEachRemaining(record -> log.info("RECORD: " + record);
);
这工作正常,直到 records
为空。一旦为空,就会多次记录“Attempt to heartbeat failed since the group is rebalancing”,记录一次“No More Records”,然后继续记录心跳错误。我能做些什么来解决这个问题以及如何优雅地检查(没有任何心跳消息)没有更多的记录要轮询?
编辑:我问了另一个问题,完整的代码和上下文在此link:
提前致谢!
Out of comment: "Since I have a UI and want to receive a message one by one by clicking the "receive" button, there might be a case when there are no more messages to be polled."
在那种情况下,每次有人点击“接收”按钮然后关闭它时,您都需要创建一个新的 KafkaConsumer
。
如果您想在客户端的生命周期内使用同一个 KafkaConsumer,您需要让代理知道它仍然存在(通过发送心跳,这是通过调用 poll 方法隐式完成的)。否则,正如您已经经历过的那样,代理认为您的 KafkaConsumer 已死,并将启动重新平衡。由于没有其他可用的活跃消费者,此重新平衡不会停止。
我正在使用 Apache KafkaConsumer。我想在不进行轮询的情况下检查消费者是否有任何消息发送给 return。如果我轮询消费者并且没有任何消息,那么即使我有 records.isEmpty()
子句,我也会在无限循环中收到消息“尝试心跳失败,因为组正在重新平衡”,直到超时到期。这是我的代码片段:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
if (records.isEmpty()) {
log.info("No More Records");
consumer.close();
}
else {
records.iterator().forEachRemaining(record -> log.info("RECORD: " + record);
);
这工作正常,直到 records
为空。一旦为空,就会多次记录“Attempt to heartbeat failed since the group is rebalancing”,记录一次“No More Records”,然后继续记录心跳错误。我能做些什么来解决这个问题以及如何优雅地检查(没有任何心跳消息)没有更多的记录要轮询?
编辑:我问了另一个问题,完整的代码和上下文在此link:
提前致谢!
Out of comment: "Since I have a UI and want to receive a message one by one by clicking the "receive" button, there might be a case when there are no more messages to be polled."
在那种情况下,每次有人点击“接收”按钮然后关闭它时,您都需要创建一个新的 KafkaConsumer
。
如果您想在客户端的生命周期内使用同一个 KafkaConsumer,您需要让代理知道它仍然存在(通过发送心跳,这是通过调用 poll 方法隐式完成的)。否则,正如您已经经历过的那样,代理认为您的 KafkaConsumer 已死,并将启动重新平衡。由于没有其他可用的活跃消费者,此重新平衡不会停止。