如何从特定偏移量开始在 Java 中使用来自 Kafka 的消息
How to consume messages from Kafka in Java, starting from a specific offset
从最早开始阅读:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
阅读最新的:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
但是如果我想从第 18 次提交开始,我应该使用哪一行?
您可以使用 seek()
来强制消费者从特定的 offset 开始消费:
public void seek(TopicPartition partition, long offset)
Overrides the fetch offsets that the consumer will use on the next poll(timeout)
. If this API is invoked for the
same partition more than once, the latest offset will be used on the
next poll()
. Note that you may lose data if this API is arbitrarily
used in the middle of consumption, to reset the fetch offsets
例如,假设您要从偏移量 18
开始:
TopicPartition tp = new TopicPartition("myTopic", 0);
Long startOffset = 18L
List<TopicPartition> topics = Arrays.asList(tp);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);
// Then consume messages as normal..
从最早开始阅读:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
阅读最新的:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
但是如果我想从第 18 次提交开始,我应该使用哪一行?
您可以使用 seek()
来强制消费者从特定的 offset 开始消费:
public void seek(TopicPartition partition, long offset)
Overrides the fetch offsets that the consumer will use on the next
poll(timeout)
. If this API is invoked for the same partition more than once, the latest offset will be used on the nextpoll()
. Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
例如,假设您要从偏移量 18
开始:
TopicPartition tp = new TopicPartition("myTopic", 0);
Long startOffset = 18L
List<TopicPartition> topics = Arrays.asList(tp);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);
// Then consume messages as normal..