Java Kafka Consumer 使用新的 Consumer Group ID 可以正常工作,但是当关闭并重新启动时它不会消耗任何消息
Java Kafka Consumer works fine with new Consumer Group ID but when shutdown and restarted it does not consume any messages
这可能是我的理解不足,但令人沮丧。我有一个 Java Kafka Consumer,当分配一个新创建的 groupId 和 consumerID 时,它会很好地使用消息。但是,当在我的 eclipse IDE 中停止 java 应用程序并使用相同的 groupID 和 consumerID 重新启动它时,它不会提取任何消息。如果我再次关闭该应用程序,并为其分配新的和不同的 groupID/consumerID,它就可以正常工作。谁能帮我弄清楚为什么会这样?
以下配置值
props.put("bootstrap.servers","192.168.5.0:30092,192.168.4.6:30092,192.168.5.8:30092");
props.put("acks", "all");
props.put("retries", 4);
props.put("batch.size", 1000);
props.put("linger.ms", 1);
props.put("buffer.memory", 335544323);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Router2");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Consumer2");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "300000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
默认情况下,当消费者开始读取消息时,它会提交已成功消费的消息的偏移量,以便它不会再次消费它们。
每次创建新的消费者组时,消费者将从最早的可用偏移量开始消费(因为之前没有为新创建的消费者组提交偏移量)。
After the consumer receives its assignment from the coordinator, it
must determine the initial position for each assigned partition. When
the group is first created, before any messages have been consumed,
the position is set according to a configurable offset reset policy
(auto.offset.reset
). Typically, consumption starts either at the
earliest offset or the latest offset.
有关更多详细信息,请参阅 Confluent 文档中的 Offset Management。
如果你想实现相同的行为,你可以简单地坚持同一个消费者群体,只需将 auto.offset.reset
设置为 earliest
而不是默认值 latest
:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
或
props.put("auto.offset.reset", "smallest")
(取决于你当前是什么版本运行)
这样,您的消费者将始终从最早的可用偏移量开始消费消息。
或者,您可以使用 seekToBeginning()
:
kafkaConsumer.poll(0); // Heartbeat sent
kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
ConsumerRecords<String, String> records = kafkaConsumer.poll(5);
请注意,这与可与 Kafka 控制台消费者一起使用的 --from-beginning
标志具有相同的效果。
这可能是我的理解不足,但令人沮丧。我有一个 Java Kafka Consumer,当分配一个新创建的 groupId 和 consumerID 时,它会很好地使用消息。但是,当在我的 eclipse IDE 中停止 java 应用程序并使用相同的 groupID 和 consumerID 重新启动它时,它不会提取任何消息。如果我再次关闭该应用程序,并为其分配新的和不同的 groupID/consumerID,它就可以正常工作。谁能帮我弄清楚为什么会这样?
以下配置值
props.put("bootstrap.servers","192.168.5.0:30092,192.168.4.6:30092,192.168.5.8:30092");
props.put("acks", "all");
props.put("retries", 4);
props.put("batch.size", 1000);
props.put("linger.ms", 1);
props.put("buffer.memory", 335544323);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Router2");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Consumer2");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "300000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
默认情况下,当消费者开始读取消息时,它会提交已成功消费的消息的偏移量,以便它不会再次消费它们。
每次创建新的消费者组时,消费者将从最早的可用偏移量开始消费(因为之前没有为新创建的消费者组提交偏移量)。
After the consumer receives its assignment from the coordinator, it must determine the initial position for each assigned partition. When the group is first created, before any messages have been consumed, the position is set according to a configurable offset reset policy (
auto.offset.reset
). Typically, consumption starts either at the earliest offset or the latest offset.
有关更多详细信息,请参阅 Confluent 文档中的 Offset Management。
如果你想实现相同的行为,你可以简单地坚持同一个消费者群体,只需将 auto.offset.reset
设置为 earliest
而不是默认值 latest
:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
或
props.put("auto.offset.reset", "smallest")
(取决于你当前是什么版本运行)
这样,您的消费者将始终从最早的可用偏移量开始消费消息。
或者,您可以使用 seekToBeginning()
:
kafkaConsumer.poll(0); // Heartbeat sent
kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
ConsumerRecords<String, String> records = kafkaConsumer.poll(5);
请注意,这与可与 Kafka 控制台消费者一起使用的 --from-beginning
标志具有相同的效果。