KafkaConsumer 没有从主题中读取所有记录

KafkaConsumer does not read all records from topic

我想测试一个kafka的例子,生产者:

object ProducerApp extends App {

  val topic = "topicTest"
  val  props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  val producer = new KafkaProducer[String, String](props)
  for(i <- 0 to 125000)
  {    
    val record = new ProducerRecord(topic, "key "+i,new PMessage())    
    producer.send(record)       
  }
}

消费者:

object ConsumerApp extends App {
  val topic = "topicTest"  
  val properties = new Properties
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")  
  properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")  
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  val consumer = new KafkaConsumer[String, String](properties)  
  consumer.subscribe(scala.List(topic).asJava)    
  while (true) {
    consumer.seekToBeginning(consumer.assignment())
    val records:ConsumerRecords[String,String] = consumer.poll(20000)
    println("records size "+records.count())    
  }  
}

主题 "topicTest" 是用 1 个分区创建的。

预期结果是:

...
records size 125000
records size 125000
records size 125000
records size 125000
...

但是得到的结果是:

...
records size 778
records size 778
records size 778
records size 778
...

消费者没有读取主题中的所有记录。我想了解原因。但是,如果记录数较少(例如 20 条),它可以正常工作并且消费者会读取所有记录。题目大小有限制吗? Kafka的配置有没有修改允许处理大量记录?

Kafka 1.0.0 有一个 max.poll.records 消费者参数,默认值为 500,因此您无法使用 125000 获得想要的结果。 出于这个原因,它适用于 20,但你得到的结果 778 很奇怪。