Kafka消费者属性从一个topic开始

Kafka consumer properties from the beginning in a topic

我正在尝试编写一个 Kafka 消费者从头开始消费消息。我可以使用 --from-beginning

从控制台消费者那里做同样的事情

但是我在JAVAAPI中找不到相应的属性。

 def consumeFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }

还有一个问题,关于 Avro 消息的 value.deserializer 应该是什么?

kafka-console-consumer 中使用的影响 --from-beginning 可以通过将 auto.offset.reset 设置为 earliest 来实现。与 unique/new group.id 结合使用具有相同的效果。

基本上,您想创建一个新的消费者组(通过 group.id),由于 Kafka Broker 不知道这个消费者组,它会根据配置自动重置这个消费者组的偏移量 auto.offset.reset.当设置为 earliest 时,它将从头开始。当设置为 latest 时,它等待新的传入数据。

关于 Avro 反序列化,这 here 可能会有所帮助。