我如何测试是否已将偏移量提交给 Kafka
How do I test that an offset has been committed or not to Kafka
我有一个正在读取 Kafka 主题的 Akka Stream Kafka 源。
我有一个简单的任务是允许禁用消息偏移量的提交。提交通常通过调用 commitScaladsl 完成。
我的问题是我不知道如何测试偏移量是否已经提交。
我们通常使用EmbeddedKafka进行测试,但我还没有想出一种方法来询问最后提交的偏移量。
这是我写的测试示例:
"KafkaSource" should {
"consume from a kafka topic and pass the message " in {
val commitToKafka = true
val key = "key".getBytes
val message = "message".getBytes
withRunningKafka {
val source = getKafkaSource(commitToKafka)
val (_, sub) = source
.toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
.run()
val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
messageOpt should not be empty
messageOpt.get.value shouldBe message
}
}
现在我想添加一个检查是否提交的偏移量。
Kafka 通过 TopicName 和 PartitionID 存储偏移量。所以你可以使用.committed()
或.position
方法来检查Kafka消费者的最后提交的偏移量或当前位置。
committed() : 获取给定分区的最后提交的偏移量(无论提交是由这个进程还是另一个进程发生)。
position() : 获取将要获取的下一条记录的偏移量(如果存在具有该偏移量的记录)。
我终于用ConsumerInterceptor解决了,定义为:
class Interceptor extends ConsumerInterceptor[Array[Byte], Array[Byte]] {
override def onConsume(records: ConsumerRecords[Array[Byte], Array[Byte]]): ConsumerRecords[Array[Byte], Array[Byte]] = records
override def onCommit(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
import scala.collection.JavaConverters._
OffsetRecorder.add(offsets.asScala)
}
override def close(): Unit = {}
override def configure(configs: java.util.Map[String, _]): Unit = OffsetRecorder.clear
}
onCommit是commit完成时调用的,这里我只是记录一下。我使用配置方法在每次测试开始时都有空记录。
然后,在为源创建使用者设置时,我将拦截器添加为 属性:
ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(s"localhost:${kafkaPort}")
.withGroupId("group-id")
.withProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "package.of.my.test.Interceptor")
我有一个正在读取 Kafka 主题的 Akka Stream Kafka 源。
我有一个简单的任务是允许禁用消息偏移量的提交。提交通常通过调用 commitScaladsl 完成。
我的问题是我不知道如何测试偏移量是否已经提交。
我们通常使用EmbeddedKafka进行测试,但我还没有想出一种方法来询问最后提交的偏移量。
这是我写的测试示例:
"KafkaSource" should {
"consume from a kafka topic and pass the message " in {
val commitToKafka = true
val key = "key".getBytes
val message = "message".getBytes
withRunningKafka {
val source = getKafkaSource(commitToKafka)
val (_, sub) = source
.toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
.run()
val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
messageOpt should not be empty
messageOpt.get.value shouldBe message
}
}
现在我想添加一个检查是否提交的偏移量。
Kafka 通过 TopicName 和 PartitionID 存储偏移量。所以你可以使用.committed()
或.position
方法来检查Kafka消费者的最后提交的偏移量或当前位置。
committed() : 获取给定分区的最后提交的偏移量(无论提交是由这个进程还是另一个进程发生)。
position() : 获取将要获取的下一条记录的偏移量(如果存在具有该偏移量的记录)。
我终于用ConsumerInterceptor解决了,定义为:
class Interceptor extends ConsumerInterceptor[Array[Byte], Array[Byte]] {
override def onConsume(records: ConsumerRecords[Array[Byte], Array[Byte]]): ConsumerRecords[Array[Byte], Array[Byte]] = records
override def onCommit(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
import scala.collection.JavaConverters._
OffsetRecorder.add(offsets.asScala)
}
override def close(): Unit = {}
override def configure(configs: java.util.Map[String, _]): Unit = OffsetRecorder.clear
}
onCommit是commit完成时调用的,这里我只是记录一下。我使用配置方法在每次测试开始时都有空记录。
然后,在为源创建使用者设置时,我将拦截器添加为 属性:
ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(s"localhost:${kafkaPort}")
.withGroupId("group-id")
.withProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "package.of.my.test.Interceptor")