KafkaConsumer.commitAsync() 行为具有比以前更低的偏移量

KafkaConsumer.commitAsync() behavior with a lower offset than previous

kafka 将如何处理对

的调用

KafkaConsumer.commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

当给定的主题偏移值小于上一次调用的值时?

它只会将分区的偏移量设置为您指定的值,因此下次您将使用来自 commitedOffset+1 的消息。
commitAsync() 的 javadoc 说:

The committed offset should be the next message your application will consume,i.e. lastProcessedMessageOffset + 1.

我很好奇并测试了它以查看行为。正如文档中所写,@haoyuwang 在他的回答中写的是正确的 (+1)。

背后的原因很简单。消费者组的提交偏移量存储在 Kafka 的内部主题 __consumer_offsets 中。本主题是 compact,这意味着它旨在为给定键提供最新值。在您的情况下,键是消费者组、主题和分区的组合,而您的值是偏移量。

如果你现在

  • 提交偏移量 10 并由于稍后的异步处理
  • 提交偏移量 5

偏移量 5 将是 __consumer_offsets 主题中的最新值。这意味着您的消费者将从该主题分区读取的下一个偏移量是偏移量 6 而不是偏移量 11。

如何重现

您可以简单地复制它并通过(同步)在您的常规提交之后提交一个较早的偏移量来测试它,就像这样:

consumer.commitSync();
consumer.commitSync(commitFirstMessage);

其中 commitFirstMessage 定义为

TopicPartition zeroTopicPartition = new TopicPartition(topic, 0);
OffsetAndMetadata zeroOffset = new OffsetAndMetadata(0L);

Map<TopicPartition, OffsetAndMetadata> commitFirstMessage = new HashMap<>();
commitFirstMessage.put(zeroTopicPartition, zeroOffset);

编辑:

如何避免使用 commitAsync 提交较低的偏移量

在书 Kafka - The Definitive Guide 中,有一条建议是避免由于 commitAsync:

的重试调用而提交较低的偏移量

Retrying Async Commits: A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don’t retry because a newer commit was already sent.

实现可能如下所示(未实际测试!):

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]
  

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }


  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}