KafkaConsumer 没有从主题中读取所有记录
KafkaConsumer does not read all records from topic
我想测试一个kafka的例子,生产者:
object ProducerApp extends App {
val topic = "topicTest"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 125000)
{
val record = new ProducerRecord(topic, "key "+i,new PMessage())
producer.send(record)
}
}
消费者:
object ConsumerApp extends App {
val topic = "topicTest"
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(scala.List(topic).asJava)
while (true) {
consumer.seekToBeginning(consumer.assignment())
val records:ConsumerRecords[String,String] = consumer.poll(20000)
println("records size "+records.count())
}
}
主题 "topicTest" 是用 1 个分区创建的。
预期结果是:
...
records size 125000
records size 125000
records size 125000
records size 125000
...
但是得到的结果是:
...
records size 778
records size 778
records size 778
records size 778
...
消费者没有读取主题中的所有记录。我想了解原因。但是,如果记录数较少(例如 20 条),它可以正常工作并且消费者会读取所有记录。题目大小有限制吗?
Kafka的配置有没有修改允许处理大量记录?
Kafka 1.0.0 有一个 max.poll.records
消费者参数,默认值为 500,因此您无法使用 125000 获得想要的结果。
出于这个原因,它适用于 20,但你得到的结果 778 很奇怪。
我想测试一个kafka的例子,生产者:
object ProducerApp extends App {
val topic = "topicTest"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 125000)
{
val record = new ProducerRecord(topic, "key "+i,new PMessage())
producer.send(record)
}
}
消费者:
object ConsumerApp extends App {
val topic = "topicTest"
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(scala.List(topic).asJava)
while (true) {
consumer.seekToBeginning(consumer.assignment())
val records:ConsumerRecords[String,String] = consumer.poll(20000)
println("records size "+records.count())
}
}
主题 "topicTest" 是用 1 个分区创建的。
预期结果是:
...
records size 125000
records size 125000
records size 125000
records size 125000
...
但是得到的结果是:
...
records size 778
records size 778
records size 778
records size 778
...
消费者没有读取主题中的所有记录。我想了解原因。但是,如果记录数较少(例如 20 条),它可以正常工作并且消费者会读取所有记录。题目大小有限制吗? Kafka的配置有没有修改允许处理大量记录?
Kafka 1.0.0 有一个 max.poll.records
消费者参数,默认值为 500,因此您无法使用 125000 获得想要的结果。
出于这个原因,它适用于 20,但你得到的结果 778 很奇怪。