Kafka 确保至少一次

Kafka Ensure At Least Once

Kafka 的第一个项目,试图证明一个事件至少会被处理一次。到目前为止,没有看到重试处理的证据。 虚拟应用程序的结构很简单:订阅、处理、发布、提交;如果异常,则中止交易并希望重试。我正在记录每条消息。

我希望看到 (1)“处理 messageX”(2)“messageX 错误”(3)“处理 messageX”。相反,我看到处理继续超出 messageX,即它没有得到重新处理。

我看到的是:(1)“处理 messageX”(2)“messageX 错误”(3)“处理 someOtherMessage”。

使用 Kafka 2.7.0、Scala 2.12。 我错过了什么?在下方显示虚拟应用的相关部分。

我还尝试从代码中删除 producer(以及所有对它的引用)。

更新 1:我设法通过使用 consumer.seek() 的偏移量重新处理记录,即将消费者发送回记录批次的开头。不知道为什么只是没有达到 consumer.commitSync()(因为异常)还没有这样做。

import com.myco.somepackage.{MyEvent, KafkaConfigTxn}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.slf4j.LoggerFactory
import java.util
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

// Prove that a message can be re-processed if there is an exception
object TopicDrainApp {
  private val logger = LoggerFactory.getLogger(this.getClass)
  private val subTopic = "input.topic"
  private val pubTopic = "output.topic"
  val producer = new KafkaProducer[String, String](KafkaConfigTxn.producerProps)
  producer.initTransactions()
  val consumer = new KafkaConsumer[String, String](KafkaConfigTxn.consumerProps)
  private var lastEventMillis = System.currentTimeMillis
  private val pollIntervalMillis = 1000
  private val pollDuration = java.time.Duration.ofMillis(pollIntervalMillis)

  def main(args: Array[String]): Unit = {
    subscribe(subTopic)
  }

  def subscribe(subTopic: String): Unit = {
    consumer.subscribe(util.Arrays.asList(subTopic))
    while (System.currentTimeMillis - lastEventMillis < 5000L) {
      try {
        val records: ConsumerRecords[String, String] = consumer.poll(pollDuration)
        records.asScala.foreach { record =>
          try {
            lastEventMillis = System.currentTimeMillis
            val event = MyEvent.deserialize(record.value())
            logger.info("ReceivedMyEvent:" + record.value())
            producer.beginTransaction()            
            simulateProcessing(event) // [not shown] throw exception to test re-processing
            producer.flush()
            val offsetsToCommit = getOffsetsToCommit(records)
            //consumer.commitSync()                                        // tried this; does not work
            //producer.sendOffsetsToTransaction(offsetsToCommit, "group1") // tried this; does not work
            producer.commitTransaction()
          } catch {
            case e: KafkaException => logger.error(s"rollback ${record.value()}", e)
              producer.abortTransaction()
          }
        }
      } catch {
        case NonFatal(e) => logger.error(e.getMessage, e)
      }
    }
  }
  private def getOffsetsToCommit(records: ConsumerRecords[String, String]): util.Map[TopicPartition, OffsetAndMetadata] = {
    records.partitions().asScala.map { partition =>
      val partitionedRecords = records.records(partition)
      val offset = partitionedRecords.get(partitionedRecords.size - 1).offset
      (partition, new OffsetAndMetadata(offset + 1))
    }.toMap.asJava
  }
}

object KafkaConfigTxn {
  // Only relevant properties are shown
  def commonProperties: Properties = {
    val props = new Properties()
    props.put(CommonClientConfigs.CLIENT_ID_CONFIG, "...")
    props.put(CommonClientConfigs.GROUP_ID_CONFIG, "...")
    props
  }
  def producerProps: Properties = {
    val props = new Properties()
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") // "enable.idempotence"
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "...") // "transactional.id"
    props.put(ProducerConfig.ACKS_CONFIG, "all")
    props.put(ProducerConfig.RETRIES_CONFIG, "3")
    commonProperties.asScala.foreach { case (k, v) => props.put(k, v) }
    props
  }
  def consumerProps: Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") // "isolation.level"
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    commonProperties.asScala.foreach { case (k, v) => props.put(k, v) }
    props
  }
}

根据我给你的参考,你需要在这个过程中使用 sendOffsetsToTransaction,但是你的消费者不会再次收到中止交易的消息,因为你只阅读提交的交易


引入事务是为了允许在 kafka 和 kafka 之间进行恰好一次处理,据说 kafka 从第一天起就支持至少一次和最多一次的传递语义,

为了至少获得一次行为,您禁用自动提交并在处理成功完成时提交,这样下次您调用 poll() 如果您在提交前有异常,您将再次读取上次提交偏移量的记录

为了在处理开始之前最多获得一次行为,如果发生异常,那么下次调用 poll() 时您会收到新消息(但会丢失其他消息)

Exactly once 是普通 java 中最难实现的(不是谈论 spring 框架,这使一切变得更容易) - 它涉及将偏移量保存到外部数据库(通常在您处理完成的地方) 并从那里开始阅读 startup/rebalance

对于 java 中的交易使用示例,您可以阅读 baeldung

的优秀指南

https://www.baeldung.com/kafka-exactly-once

为演示应用找出正确的方法调用组合(订阅、开始事务、处理、提交/中止事务等)。代码的核心是

  def readProcessWrite(subTopic: String, pubTopic: String): Int = {
    var lastEventMillis = System.currentTimeMillis
    val consumer = createConsumer(subTopic)
    val producer = createProducer()
    val groupMetadata = consumer.groupMetadata()
    var numRecords = 0
    while (System.currentTimeMillis - lastEventMillis < 10000L) {
      try {
        val records: ConsumerRecords[String, String] = consumer.poll(pollDuration)
        val offsetsToCommit = getOffsetsToCommit(records)
        // println(s">>> PollRecords: ${records.count()}")
        records.asScala.foreach { record =>
          val currentOffset = record.offset()
          try {
            numRecords += 1
            lastEventMillis = System.currentTimeMillis
            println(s">>> Topic: $subTopic, ReceivedEvent: offset=${record.offset()}, key=${record.key()}, value=${record.value()}")
            producer.beginTransaction()
            val eventOut = simulateProcessing(record.value()) // may throw
            publish(producer, pubTopic, eventOut)
            producer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata)
            consumer.commitSync()
            producer.commitTransaction()
          } catch {
            case e: KafkaException => println(s"---------- rollback ${record.value()}", e)
              producer.abortTransaction()
              offsetsToCommit.forEach { case (topicPartition, _) =>
                consumer.seek(topicPartition, currentOffset)
              }
          }
        }
      } catch {
        case NonFatal(e) => logger.error(e.getMessage, e)
      }
    }
    consumer.close()
    producer.close()
    numRecords
  }
// Consumer created with props.put("max.poll.records", "1")

我能够证明这将只处理每个事件一次,即使 simulateProcessing() 抛出异常。准确地说:当处理工作正常时,每个事件只处理一次。如果有异常,则重新处理该事件,直到成功。在我的例子中,没有真正的异常原因,所以重新处理总是以成功结束。