Spring Kafka 总是在 5 分钟后重新平衡,即使我暂停消费者

Spring Kafka Always rebalance after 5 min even i pause consumer

有一个耗时的操作(大约 10 分钟),但是 kafka 在 5 分钟后失去了重新平衡,即使我暂停了消费者。消费者方法:

    @KafkaListener(topics = {TopicAppoint.EXECUTE_SCHOOL_DATA_STATICS_TASK})
public void receiveMessage(@Payload String payload, Consumer<String, String> consumer) {

    Set<TopicPartition> assignment = consumer.assignment();
    consumer.pause(assignment);

    if (StringUtils.isNotEmpty(payload)) {
        SchoolStatisticsTaskDTO staticsTaskDTO = JSONObject.parseObject(payload, SchoolStatisticsTaskDTO.class);
        Optional<SchoolStatisticsTaskDO> taskOptional = schoolStatisticsTaskRepository.findById(staticsTaskDTO.getTrackId());
        taskOptional.ifPresent(schoolStaticsTaskDO -> {
            // handler
        });
    }

    consumer.resume(assignment);
}

这是我的配置:

  kafka:
bootstrap-servers: 192.168.0.230:9092
producer:
  key-serializer: org.apache.kafka.common.serialization.StringSerializer
  value-serializer: org.apache.kafka.common.serialization.StringSerializer
  retries: 3
  properties:
    max.request.size: 12582912
consumer:
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  group-id: dc-fitness-data-consumer-group
  properties:
    max.partition.fetch.bytes: 12582912
  #enable-auto-commit: false
listener:
  ack-mode: record
  concurrency: 6

日志

  13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
   13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
   13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
  13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.220 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#5-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-47, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.225 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] (Re-)joining group

先看一下文档,从pause()方法开始

public void pause(集合分区) here

Suspend fetching from the requested partitions. Future calls to poll(long) will not return any records from these partitions until they have been resumed using resume(Collection). Note that this method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.

所以从上面的描述来看pause()方法会暂停分区获取消息但是不会暂停消费者线程,也就是说消费者线程会做后续的事情poll()请求暂停分区但不会获取任何记录

在您的应用程序中: 分区已暂停,但消费者线程忙于执行耗时操作超过 10 分钟,而没有执行任何轮询请求。

正在检测消费者故障: 因此当轮询停止时,心跳将不会发送到集群

After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown), then no heartbeats will be sent. If a period of the configured session timeout elapses before the server has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned.

解决方案: 增加以下属性的超时时间,因为默认时间为 5 分钟,您会看到每 5 分钟重新平衡一次(个人不支持增加超时时间)here

The new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.

解决方案 2: 如果您需要用更少的时间处理大量数据,请增加更多分区并使用每个线程消耗每个分区(这意味着更多并发),减少数据可以在5分钟内处理