KafkaConsumer Position() 与 Committed()

KafkaConsumer Position() vs Committed()

position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).

committed(TopicPartition partition): OffsetAndMetadata  

Get the last committed offset for the given partition (whether the commit happened by this process or another).

如果我需要使用特定消费者组的最新提交的偏移量(用于 Spark Structured Streaming 的 startingOffset),我应该使用什么。

我的代码显示已提交已弃用。

  val latestOffset = consumer.position(partition)
  val last=consumer.committed(partition)

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.4.1</version>
    </dependency>

官方文档:

补偿和消费者地位 Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。实际上有两个与消费者用户相关的位置概念: 消费者的位置给出了下一条将要发出的记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一个。每次消费者在对 poll(long) 的调用中收到消息时,它都会自动前进。

提交的位置是已安全存储的最后一个偏移量。如果进程失败并重新启动,这就是消费者将恢复到的偏移量。消费者可以定期自动提交偏移量;或者它可以选择通过调用其中一种提交 API(例如 commitSync 和 commitAsync)来手动控制此提交位置。

您需要在 Spark Streaming 作业中使用 committed 偏移作为 startingOffset。

position API 的计数器在其运行时由 KafkaConsumer 递增,并且可能与 committed API 的结果略有不同,因为消费者可能会或可能不会提交偏移量,如果它提交,它可能会异步执行。

在 Kafka 2.4.1 中,方法 committed(partition) 已弃用,建议使用 newer API,它采用 Set TopicPartitions。它的签名是:

public Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions)

由于您使用的是 Scala,因此需要将您的 Scala 集合转换为 Java 集合。这可以按照 here.

中描述的方式完成