KafkaConsumer Position() 与 Committed()
KafkaConsumer Position() vs Committed()
position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).
committed(TopicPartition partition): OffsetAndMetadata
Get the last committed offset for the given partition (whether the commit happened by this process or another).
如果我需要使用特定消费者组的最新提交的偏移量(用于 Spark Structured Streaming 的 startingOffset),我应该使用什么。
我的代码显示已提交已弃用。
val latestOffset = consumer.position(partition)
val last=consumer.committed(partition)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
官方文档:
补偿和消费者地位
Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。实际上有两个与消费者用户相关的位置概念:
消费者的位置给出了下一条将要发出的记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一个。每次消费者在对 poll(long) 的调用中收到消息时,它都会自动前进。
提交的位置是已安全存储的最后一个偏移量。如果进程失败并重新启动,这就是消费者将恢复到的偏移量。消费者可以定期自动提交偏移量;或者它可以选择通过调用其中一种提交 API(例如 commitSync 和 commitAsync)来手动控制此提交位置。
您需要在 Spark Streaming 作业中使用 committed 偏移作为 startingOffset。
position
API 的计数器在其运行时由 KafkaConsumer 递增,并且可能与 committed
API 的结果略有不同,因为消费者可能会或可能不会提交偏移量,如果它提交,它可能会异步执行。
在 Kafka 2.4.1 中,方法 committed(partition)
已弃用,建议使用 newer API,它采用 Set
TopicPartitions。它的签名是:
public Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions)
由于您使用的是 Scala,因此需要将您的 Scala 集合转换为 Java 集合。这可以按照 here.
中描述的方式完成
position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).
committed(TopicPartition partition): OffsetAndMetadata
Get the last committed offset for the given partition (whether the commit happened by this process or another).
如果我需要使用特定消费者组的最新提交的偏移量(用于 Spark Structured Streaming 的 startingOffset),我应该使用什么。
我的代码显示已提交已弃用。
val latestOffset = consumer.position(partition)
val last=consumer.committed(partition)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
官方文档:
补偿和消费者地位 Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。实际上有两个与消费者用户相关的位置概念: 消费者的位置给出了下一条将要发出的记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一个。每次消费者在对 poll(long) 的调用中收到消息时,它都会自动前进。
提交的位置是已安全存储的最后一个偏移量。如果进程失败并重新启动,这就是消费者将恢复到的偏移量。消费者可以定期自动提交偏移量;或者它可以选择通过调用其中一种提交 API(例如 commitSync 和 commitAsync)来手动控制此提交位置。
您需要在 Spark Streaming 作业中使用 committed 偏移作为 startingOffset。
position
API 的计数器在其运行时由 KafkaConsumer 递增,并且可能与 committed
API 的结果略有不同,因为消费者可能会或可能不会提交偏移量,如果它提交,它可能会异步执行。
在 Kafka 2.4.1 中,方法 committed(partition)
已弃用,建议使用 newer API,它采用 Set
TopicPartitions。它的签名是:
public Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions)
由于您使用的是 Scala,因此需要将您的 Scala 集合转换为 Java 集合。这可以按照 here.
中描述的方式完成