Kafka 确保至少一次
Kafka Ensure At Least Once
Kafka 的第一个项目,试图证明一个事件至少会被处理一次。到目前为止,没有看到重试处理的证据。
虚拟应用程序的结构很简单:订阅、处理、发布、提交;如果异常,则中止交易并希望重试。我正在记录每条消息。
我希望看到 (1)“处理 messageX”(2)“messageX 错误”(3)“处理 messageX”。相反,我看到处理继续超出 messageX,即它没有得到重新处理。
我看到的是:(1)“处理 messageX”(2)“messageX 错误”(3)“处理 someOtherMessage”。
使用 Kafka 2.7.0、Scala 2.12。
我错过了什么?在下方显示虚拟应用的相关部分。
我还尝试从代码中删除 producer
(以及所有对它的引用)。
更新 1:我设法通过使用 consumer.seek()
的偏移量重新处理记录,即将消费者发送回记录批次的开头。不知道为什么只是没有达到 consumer.commitSync()
(因为异常)还没有这样做。
import com.myco.somepackage.{MyEvent, KafkaConfigTxn}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.slf4j.LoggerFactory
import java.util
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
// Prove that a message can be re-processed if there is an exception
object TopicDrainApp {
private val logger = LoggerFactory.getLogger(this.getClass)
private val subTopic = "input.topic"
private val pubTopic = "output.topic"
val producer = new KafkaProducer[String, String](KafkaConfigTxn.producerProps)
producer.initTransactions()
val consumer = new KafkaConsumer[String, String](KafkaConfigTxn.consumerProps)
private var lastEventMillis = System.currentTimeMillis
private val pollIntervalMillis = 1000
private val pollDuration = java.time.Duration.ofMillis(pollIntervalMillis)
def main(args: Array[String]): Unit = {
subscribe(subTopic)
}
def subscribe(subTopic: String): Unit = {
consumer.subscribe(util.Arrays.asList(subTopic))
while (System.currentTimeMillis - lastEventMillis < 5000L) {
try {
val records: ConsumerRecords[String, String] = consumer.poll(pollDuration)
records.asScala.foreach { record =>
try {
lastEventMillis = System.currentTimeMillis
val event = MyEvent.deserialize(record.value())
logger.info("ReceivedMyEvent:" + record.value())
producer.beginTransaction()
simulateProcessing(event) // [not shown] throw exception to test re-processing
producer.flush()
val offsetsToCommit = getOffsetsToCommit(records)
//consumer.commitSync() // tried this; does not work
//producer.sendOffsetsToTransaction(offsetsToCommit, "group1") // tried this; does not work
producer.commitTransaction()
} catch {
case e: KafkaException => logger.error(s"rollback ${record.value()}", e)
producer.abortTransaction()
}
}
} catch {
case NonFatal(e) => logger.error(e.getMessage, e)
}
}
}
private def getOffsetsToCommit(records: ConsumerRecords[String, String]): util.Map[TopicPartition, OffsetAndMetadata] = {
records.partitions().asScala.map { partition =>
val partitionedRecords = records.records(partition)
val offset = partitionedRecords.get(partitionedRecords.size - 1).offset
(partition, new OffsetAndMetadata(offset + 1))
}.toMap.asJava
}
}
object KafkaConfigTxn {
// Only relevant properties are shown
def commonProperties: Properties = {
val props = new Properties()
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, "...")
props.put(CommonClientConfigs.GROUP_ID_CONFIG, "...")
props
}
def producerProps: Properties = {
val props = new Properties()
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") // "enable.idempotence"
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "...") // "transactional.id"
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.RETRIES_CONFIG, "3")
commonProperties.asScala.foreach { case (k, v) => props.put(k, v) }
props
}
def consumerProps: Properties = {
val props = new Properties()
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") // "isolation.level"
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
commonProperties.asScala.foreach { case (k, v) => props.put(k, v) }
props
}
}
根据我给你的参考,你需要在这个过程中使用 sendOffsetsToTransaction,但是你的消费者不会再次收到中止交易的消息,因为你只阅读提交的交易
引入事务是为了允许在 kafka 和 kafka 之间进行恰好一次处理,据说 kafka 从第一天起就支持至少一次和最多一次的传递语义,
为了至少获得一次行为,您禁用自动提交并在处理成功完成时提交,这样下次您调用 poll() 如果您在提交前有异常,您将再次读取上次提交偏移量的记录
为了在处理开始之前最多获得一次行为,如果发生异常,那么下次调用 poll() 时您会收到新消息(但会丢失其他消息)
Exactly once 是普通 java 中最难实现的(不是谈论 spring 框架,这使一切变得更容易) - 它涉及将偏移量保存到外部数据库(通常在您处理完成的地方) 并从那里开始阅读 startup/rebalance
对于 java 中的交易使用示例,您可以阅读 baeldung
的优秀指南
为演示应用找出正确的方法调用组合(订阅、开始事务、处理、提交/中止事务等)。代码的核心是
def readProcessWrite(subTopic: String, pubTopic: String): Int = {
var lastEventMillis = System.currentTimeMillis
val consumer = createConsumer(subTopic)
val producer = createProducer()
val groupMetadata = consumer.groupMetadata()
var numRecords = 0
while (System.currentTimeMillis - lastEventMillis < 10000L) {
try {
val records: ConsumerRecords[String, String] = consumer.poll(pollDuration)
val offsetsToCommit = getOffsetsToCommit(records)
// println(s">>> PollRecords: ${records.count()}")
records.asScala.foreach { record =>
val currentOffset = record.offset()
try {
numRecords += 1
lastEventMillis = System.currentTimeMillis
println(s">>> Topic: $subTopic, ReceivedEvent: offset=${record.offset()}, key=${record.key()}, value=${record.value()}")
producer.beginTransaction()
val eventOut = simulateProcessing(record.value()) // may throw
publish(producer, pubTopic, eventOut)
producer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata)
consumer.commitSync()
producer.commitTransaction()
} catch {
case e: KafkaException => println(s"---------- rollback ${record.value()}", e)
producer.abortTransaction()
offsetsToCommit.forEach { case (topicPartition, _) =>
consumer.seek(topicPartition, currentOffset)
}
}
}
} catch {
case NonFatal(e) => logger.error(e.getMessage, e)
}
}
consumer.close()
producer.close()
numRecords
}
// Consumer created with props.put("max.poll.records", "1")
我能够证明这将只处理每个事件一次,即使 simulateProcessing()
抛出异常。准确地说:当处理工作正常时,每个事件只处理一次。如果有异常,则重新处理该事件,直到成功。在我的例子中,没有真正的异常原因,所以重新处理总是以成功结束。
Kafka 的第一个项目,试图证明一个事件至少会被处理一次。到目前为止,没有看到重试处理的证据。 虚拟应用程序的结构很简单:订阅、处理、发布、提交;如果异常,则中止交易并希望重试。我正在记录每条消息。
我希望看到 (1)“处理 messageX”(2)“messageX 错误”(3)“处理 messageX”。相反,我看到处理继续超出 messageX,即它没有得到重新处理。
我看到的是:(1)“处理 messageX”(2)“messageX 错误”(3)“处理 someOtherMessage”。
使用 Kafka 2.7.0、Scala 2.12。 我错过了什么?在下方显示虚拟应用的相关部分。
我还尝试从代码中删除 producer
(以及所有对它的引用)。
更新 1:我设法通过使用 consumer.seek()
的偏移量重新处理记录,即将消费者发送回记录批次的开头。不知道为什么只是没有达到 consumer.commitSync()
(因为异常)还没有这样做。
import com.myco.somepackage.{MyEvent, KafkaConfigTxn}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.slf4j.LoggerFactory
import java.util
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
// Prove that a message can be re-processed if there is an exception
object TopicDrainApp {
private val logger = LoggerFactory.getLogger(this.getClass)
private val subTopic = "input.topic"
private val pubTopic = "output.topic"
val producer = new KafkaProducer[String, String](KafkaConfigTxn.producerProps)
producer.initTransactions()
val consumer = new KafkaConsumer[String, String](KafkaConfigTxn.consumerProps)
private var lastEventMillis = System.currentTimeMillis
private val pollIntervalMillis = 1000
private val pollDuration = java.time.Duration.ofMillis(pollIntervalMillis)
def main(args: Array[String]): Unit = {
subscribe(subTopic)
}
def subscribe(subTopic: String): Unit = {
consumer.subscribe(util.Arrays.asList(subTopic))
while (System.currentTimeMillis - lastEventMillis < 5000L) {
try {
val records: ConsumerRecords[String, String] = consumer.poll(pollDuration)
records.asScala.foreach { record =>
try {
lastEventMillis = System.currentTimeMillis
val event = MyEvent.deserialize(record.value())
logger.info("ReceivedMyEvent:" + record.value())
producer.beginTransaction()
simulateProcessing(event) // [not shown] throw exception to test re-processing
producer.flush()
val offsetsToCommit = getOffsetsToCommit(records)
//consumer.commitSync() // tried this; does not work
//producer.sendOffsetsToTransaction(offsetsToCommit, "group1") // tried this; does not work
producer.commitTransaction()
} catch {
case e: KafkaException => logger.error(s"rollback ${record.value()}", e)
producer.abortTransaction()
}
}
} catch {
case NonFatal(e) => logger.error(e.getMessage, e)
}
}
}
private def getOffsetsToCommit(records: ConsumerRecords[String, String]): util.Map[TopicPartition, OffsetAndMetadata] = {
records.partitions().asScala.map { partition =>
val partitionedRecords = records.records(partition)
val offset = partitionedRecords.get(partitionedRecords.size - 1).offset
(partition, new OffsetAndMetadata(offset + 1))
}.toMap.asJava
}
}
object KafkaConfigTxn {
// Only relevant properties are shown
def commonProperties: Properties = {
val props = new Properties()
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, "...")
props.put(CommonClientConfigs.GROUP_ID_CONFIG, "...")
props
}
def producerProps: Properties = {
val props = new Properties()
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") // "enable.idempotence"
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "...") // "transactional.id"
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.RETRIES_CONFIG, "3")
commonProperties.asScala.foreach { case (k, v) => props.put(k, v) }
props
}
def consumerProps: Properties = {
val props = new Properties()
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") // "isolation.level"
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
commonProperties.asScala.foreach { case (k, v) => props.put(k, v) }
props
}
}
根据我给你的参考,你需要在这个过程中使用 sendOffsetsToTransaction,但是你的消费者不会再次收到中止交易的消息,因为你只阅读提交的交易
引入事务是为了允许在 kafka 和 kafka 之间进行恰好一次处理,据说 kafka 从第一天起就支持至少一次和最多一次的传递语义,
为了至少获得一次行为,您禁用自动提交并在处理成功完成时提交,这样下次您调用 poll() 如果您在提交前有异常,您将再次读取上次提交偏移量的记录
为了在处理开始之前最多获得一次行为,如果发生异常,那么下次调用 poll() 时您会收到新消息(但会丢失其他消息)
Exactly once 是普通 java 中最难实现的(不是谈论 spring 框架,这使一切变得更容易) - 它涉及将偏移量保存到外部数据库(通常在您处理完成的地方) 并从那里开始阅读 startup/rebalance
对于 java 中的交易使用示例,您可以阅读 baeldung
的优秀指南为演示应用找出正确的方法调用组合(订阅、开始事务、处理、提交/中止事务等)。代码的核心是
def readProcessWrite(subTopic: String, pubTopic: String): Int = {
var lastEventMillis = System.currentTimeMillis
val consumer = createConsumer(subTopic)
val producer = createProducer()
val groupMetadata = consumer.groupMetadata()
var numRecords = 0
while (System.currentTimeMillis - lastEventMillis < 10000L) {
try {
val records: ConsumerRecords[String, String] = consumer.poll(pollDuration)
val offsetsToCommit = getOffsetsToCommit(records)
// println(s">>> PollRecords: ${records.count()}")
records.asScala.foreach { record =>
val currentOffset = record.offset()
try {
numRecords += 1
lastEventMillis = System.currentTimeMillis
println(s">>> Topic: $subTopic, ReceivedEvent: offset=${record.offset()}, key=${record.key()}, value=${record.value()}")
producer.beginTransaction()
val eventOut = simulateProcessing(record.value()) // may throw
publish(producer, pubTopic, eventOut)
producer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata)
consumer.commitSync()
producer.commitTransaction()
} catch {
case e: KafkaException => println(s"---------- rollback ${record.value()}", e)
producer.abortTransaction()
offsetsToCommit.forEach { case (topicPartition, _) =>
consumer.seek(topicPartition, currentOffset)
}
}
}
} catch {
case NonFatal(e) => logger.error(e.getMessage, e)
}
}
consumer.close()
producer.close()
numRecords
}
// Consumer created with props.put("max.poll.records", "1")
我能够证明这将只处理每个事件一次,即使 simulateProcessing()
抛出异常。准确地说:当处理工作正常时,每个事件只处理一次。如果有异常,则重新处理该事件,直到成功。在我的例子中,没有真正的异常原因,所以重新处理总是以成功结束。