Kafka消费者属性从一个topic开始
Kafka consumer properties from the beginning in a topic
我正在尝试编写一个 Kafka 消费者从头开始消费消息。我可以使用 --from-beginning
从控制台消费者那里做同样的事情
但是我在JAVAAPI中找不到相应的属性。
def consumeFromKafka(topic: String) = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator)
println(data.value())
}
}
还有一个问题,关于 Avro 消息的 value.deserializer 应该是什么?
kafka-console-consumer
中使用的影响 --from-beginning
可以通过将 auto.offset.reset
设置为 earliest
来实现。与 unique/new group.id
结合使用具有相同的效果。
基本上,您想创建一个新的消费者组(通过 group.id
),由于 Kafka Broker 不知道这个消费者组,它会根据配置自动重置这个消费者组的偏移量 auto.offset.reset
.当设置为 earliest
时,它将从头开始。当设置为 latest
时,它等待新的传入数据。
关于 Avro 反序列化,这 here 可能会有所帮助。
我正在尝试编写一个 Kafka 消费者从头开始消费消息。我可以使用 --from-beginning
从控制台消费者那里做同样的事情但是我在JAVAAPI中找不到相应的属性。
def consumeFromKafka(topic: String) = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator)
println(data.value())
}
}
还有一个问题,关于 Avro 消息的 value.deserializer 应该是什么?
kafka-console-consumer
中使用的影响 --from-beginning
可以通过将 auto.offset.reset
设置为 earliest
来实现。与 unique/new group.id
结合使用具有相同的效果。
基本上,您想创建一个新的消费者组(通过 group.id
),由于 Kafka Broker 不知道这个消费者组,它会根据配置自动重置这个消费者组的偏移量 auto.offset.reset
.当设置为 earliest
时,它将从头开始。当设置为 latest
时,它等待新的传入数据。
关于 Avro 反序列化,这 here 可能会有所帮助。