即使没有到达 Ack,Kafka 偏移量也会增加

Kafka offset increments even without reaching Ack

我有一个消费者消费一条消息,做一些繁重的工作然后确认。

    @KafkaListener(topics = "${kafka.topic}", groupId = "group", containerFactory ="ContainerFactory")
  public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {

try {
  //Heavy Job
  ack.acknowledge();
} catch (Exception e) {
  log("Error in Kafka Consumer);
    }
  }

现在如果有异常,它应该进入 catch 块并且确认不应该发生,如果确认没有发生它应该回到队列并再次处理。但它没有发生。偏移量更新并选择下一条消息。 我知道消费者有一个轮询大小,这使它能够一次选择多个消息。但是即使一条消息没有被确认,它也应该重新处理它而不是忽略它并更新偏移量。

这是 Kafka 消费者配置

`Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

这是底层 KafkaConsumer 的预期行为。

在幕后,KafkaConsumer 使用 poll API,在 JavaDocs 中描述为:

"On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(TopicPartition, long) or automatically set as the last committed offset for the subscribed list of partitions."

这意味着,它不会检查最后的 committed 偏移量,而是检查最后的 consumed 偏移量,然后按顺序获取数据.只有当 re-starting 你的工作才会继续读取该消费者组的最后提交的偏移量,或者如果你使用基于 auto_offset_reset 配置的新消费者组。

为了解决您的问题,我看到您可以在 catch 块中应用以下解决方案:

  • 不要只记录“卡夫卡消费者中的错误”,而是关闭您的工作。修复代码和 re-start 您的应用程序
  • 使用 re-position 您的消费者再次使用 seek API 使用偏移量编号(导致异常)到相同的偏移量。可以找到有关搜索方法的详细信息here