Kafka Consumer 偏移量提交检查以避免提交较小的偏移量

Kafka Consumer offset commit check to avoid committing smaller offsets

我们假设我们有一个消费者发送请求以提交偏移量 10。 如果出现沟通问题,经纪人没有收到请求,当然也没有回应。之后我们有另一个消费者处理另一个批次并成功提交偏移量 20.

Q:我想知道有没有办法或者属性来处理,这样我们就可以检查日志中之前的偏移量是否已经提交在我们的例子中提交偏移量 20?

您描述的场景只有在使用异步提交时才会发生。

请记住,一个特定的 TopicPartition 只能由同一 ConsumerGroup 中的单个消费者使用。如果您有两个消费者阅读同一个 TopicPartition,则只可能

  1. 如果他们有不同的 ConsumerGroup,或者
  2. 如果他们有相同的 ConsumerGroup 并且发生再平衡。但是,一次只有一个消费者会读取该 TopicPartition,永远不会同时读取。

案例 #1 非常清楚:如果他们有不同的 ConsumerGroup,他们会并行且独立地使用分区。他们的committed抵消也是分开管理的。

案例 #2:如果第一个消费者因为消费者 failed/died 并且没有恢复而未能提交偏移量 10,则会发生消费者重新平衡,并且另一个活跃的消费者将选择该分区。由于未提交偏移量 10,新消费者将在跳转到下一批之前再次开始读取偏移量 10,并可能提交偏移量 20。这导致“at-least-once”语义并可能导致重复。

现在,来到唯一可以在提交较高偏移量后提交较小偏移量的情况。如开头所述,如果您 异步 提交偏移量(使用 commitAsync),这确实会发生。想象以下场景,按时间排序:

  • 消费者读取偏移量 0(后台线程尝试提交偏移量 0)
  • 提交偏移量 0 成功
  • 消费者读取偏移量 1(后台线程尝试提交偏移量 1)
  • 提交偏移量 1 失败,稍后重试
  • 消费者读取偏移量 2(后台线程尝试提交偏移量 2)
  • 提交偏移量 2 成功
  • 现在,如何处理(re-trying 提交偏移量 1?)

如果你让重试机制再次提交偏移量1,看起来你的消费者只提交到偏移量1。这是因为每个消费者组在最新偏移量par TopicPartition上的信息存储在internal compacted Kafka topic __consumer_offsets 这意味着只存储我们消费者组的最新值(在我们的例子中:偏移量 1)。

在《Kafka - The Definitive Guide》一书中,有一个关于如何缓解这个问题的提示:

Retrying Async Commits: A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don’t retry because a newer commit was already sent.

例如,您可以在下面的 Scala 中看到这个想法的实现:

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]
  

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }


  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}