Kafka消费者:抵消消费者和生产者之间的滞后

Kafka consumer : offset lag between consumer and poducer

我正在编写一个 Kafka 消费者应用程序,其中每个分区都有一个消费者。代码如下

while (true) {  
  ConsumerRecords<String, String> records = consumer.poll(100);   
        for (ConsumerRecord<String, String> record : records) {     
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",        
            record.topic(), record.partition(), record.offset(), record.key(), record.value());   
 }    
  consumer.commitAsync(new OffsetCommitCallback() {       
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {  
           if (e != null)              
              log.error("Commit failed for offsets {}", offsets, e); }}); 
}

有没有办法以编程方式访问和打印消费者滞后偏移量,或者说读取的最后一条记录的偏移量之间的位置差异消费者和某个生产者写入该消费者分区的最后一条记录的偏移量。

我应该在上面添加什么语句才能得到滞后偏移值,知道我的最终目标是将这个值发送给prometheus进行监控吗?

如果您的目标是在普罗米修斯中获取数据,您应该使用消费者发出的 records-lag 指标。

在文档 Consumer Fetch Metrics section 的最后 table 中,您可以看到 Consumers 根据 topic-partition.

发出滞后(和领先)

默认情况下,指标是通过 JMX 发出的,因此您可以使用 Prometheus' JMX exporter 来完成这个工作。

您可以使用多种选项来监控您的消费者延迟。

使用 KafkaConsumer 的 endOffsets API

根据 KafkaConsumer 上的 JavaDocs,您可以使用 endOffsets 方法“获取给定分区的结束偏移量”。

使用 JMX 指标

您可以通过使用 JMX 指标来利用 Kafka 的监控功能。在 Monitoring Kafka 上的文档中解释了如何获取“消费者落后于生产者的消息数。由消费者而非代理发布。”