kafka 批量消费者如何处理长 running/processing 记录?

How does a kafka batch consumer handles long running/processing records?

我正在使用 spring-kafka '2.2.7.RELEASE' 创建一个批量消费者,我试图了解当我的记录处理时间超过 max.poll.interval.ms.

这是我的配置。

public Map<String, Object> myBatchConsumerConfigs() {
       Map<String, Object> configs = = new HashMap<>();
       configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
       configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
       configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
       sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
   }

这是我的出厂设置

@Bean
         public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(myBatchConsumerConfigs()));
           factory.getContainerProperties().setMissingTopicsFatal(false);

           factory.getContainerProperties().setAckMode(AckMode.BATCH);

           factory.setErrorHandler(myCustomKafkaSeekToCurrentErrorHandler);
          
           factory.setRecoveryCallback(myCustomKafkaRecoveryCallback);
           factory.setStatefulRetry(true);
           factory.setBatchListener(true);
           
           factory.setBatchErrorHandler(myBatchConsumerSeekToCurrentErrorHandler);
                   factory.getContainerProperties().setConsumerRebalanceListener(myBatchConsumerAwareRebalanceListener);
                   factory.setRecoveryCallback(context -> {
                       logger.logInfo("In recovery call back for KES Batch Consumer", this.getClass());
                       myBatchConsumerDeadLetterRecoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"), (Exception) context.getLastThrowable());
                       return null;
                   });
           return factory;
         }

我添加了自定义消费者侦听器,如此处所示。

@Component
public class MyBatchConsumerAwareRebalanceListener implements ConsumerAwareRebalanceListener {  

@Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
        partitions.forEach(
                topicPartition -> {
                    System.out.println(" onPartitionsRevokedBeforeCommit - topic "+ topicPartition.topic() +" partition - "+topicPartition.partition());
                }
        );
        //springLogger.logInfo(" onPartitionsRevokedBeforeCommit", getClass());
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
        partitions.forEach(
                topicPartition -> {
                    System.out.println(" onPartitionsRevokedAfterCommit - topic "+ topicPartition.topic() +" partition - "+topicPartition.partition());
                }
        );
        //springLogger.logInfo(" onPartitionsRevokedAfterCommit", getClass());
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
        partitions.forEach(
                topicPartition -> {
                    System.out.println(" onPartitionsAssigned - topic - "+ topicPartition.topic() +" partition - "+topicPartition.partition());
                }
        );
        //springLogger.logInfo(" onPartitionsAssigned", getClass());
    }

}


Here is my consumer where I've added a delay of 400 sec delay/sleep which is greater than the max.poll.interval.ms ( 300 ms)    



@KafkaListener(groupId = "TestBatchConsumers", topics = TEST_KES_BATCH_CONSUMER_TOPIC, containerFactory = "myBatchConsumerContainerFactory")
       public void consumeRecords(List<ConsumerRecord<String, Organization>> consumerRecords) {

           long startTime = System.currentTimeMillis();
           System.out.println("Processing started at  "+startTime);
           consumerRecords.forEach(consumerRecord -> {
               System.out.println(
                       "Received consumerRecord on topic" + consumerRecord.topic()+" , partition "+consumerRecord.partition()
                               + ", at offset " + consumerRecord.offset() + ", with key " + consumerRecord.key() );


               try {
                   Thread.sleep(400000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           });

           System.out.println("Processing completed at "+ System.currentTimeMillis());

           long processingTimeInSec = (System.currentTimeMillis() - startTime)/1000 ;
           System.out.println(processingTimeInSec);
       }

现在我期待消费者组重新平衡,因为处理时间超过 max.poll.interval.ms 但我没有看到任何此类行为。我在这里遗漏了什么吗?

求推荐。

它的表现符合我的预期:

@SpringBootApplication
public class So67520619Application {

    public static void main(String[] args) {
        SpringApplication.run(So67520619Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so67520619", "foo");
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67520619").partitions(1).replicas(1).build();
    }

}

@Component
class Listener implements ConsumerAwareRebalanceListener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @KafkaListener(id = "so67520619", topics = "so67520619")
    public void listen(List<String> in) throws InterruptedException {
        LOG.info(in.toString());
        Thread.sleep(12000);
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        LOG.info("Assigned: " + partitions);
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        LOG.info("Revoked: " + partitions);
    }

}
spring.kafka.consumer.properties.max.poll.interval.ms=10000
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch
2021-05-13 11:13:16.339  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 11:13:16.358  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo]
2021-05-13 11:13:26.416  INFO 20954 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-c9a440bf-9076-4575-813d-3efb0054f5f7 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 11:13:28.365  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 11:13:28.370 ERROR 20954 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1431) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
    ... 3 common frames omitted

2021-05-13 11:13:28.371  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 11:13:28.371  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 11:13:28.371  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : Revoked: [so67520619-0]
2021-05-13 11:13:28.372  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.376  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 11:13:28.376  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.485  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 3: {consumer-so67520619-1-15ce3150-1aa3-4b43-a892-fbd54e8ed919=Assignment(partitions=[so67520619-0])}
2021-05-13 11:13:28.486  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 3
2021-05-13 11:13:28.487  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 11:13:28.487  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 11:13:28.488  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 11:13:28.489  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 11:13:28.490  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 11:13:28.494  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo]

编辑

还有 60/65 秒,它对我来说仍然有效...

2021-05-13 17:24:28.111  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 17:24:28.130  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo, foo, foo]
2021-05-13 17:25:28.147  INFO 37063 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-269ac261-3838-4925-a9a7-fd0687db3522 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 17:25:33.135  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 17:25:33.141 ERROR 37063 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

...

2021-05-13 17:25:33.141  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 17:25:33.141  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 17:25:33.141  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : Revoked: [so67520619-0]
2021-05-13 17:25:33.142  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.145  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 17:25:33.145  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.250  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 9: {consumer-so67520619-1-bd22a252-64f2-4be3-a6eb-8371b8f95ff2=Assignment(partitions=[so67520619-0])}
2021-05-13 17:25:33.254  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 9
2021-05-13 17:25:33.254  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 17:25:33.255  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 17:25:33.256  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 17:25:33.258  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 17:25:33.258  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 17:25:33.261  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo, foo, foo]

Kafka 在 60 秒后报错,5 秒后提交失败。