Kafka消费者在心跳失败后离开群组

Kafka consumer leaves group after failing to heartbeat

我们正面临一组 kafka 消费者的问题。每当 kafka-cluster 上有 activity 时,比如重启代理(滚动重启)或重启 VM 的 运行 代理,我们的 kafka 消费者 LeaveGroup 在心跳失败后。以下日志准确重复一分钟,对应于应用程序代码中作为消费来自主题

的消息的一部分进行的 commitSync 调用
[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] Offset commit failed on partition <topic-name> at offset 455700: The coordinator is loading and hence can't process requests.

此间隔对应于在 Kafka 消费者客户端中完成重试的默认时间 commitSync java API.

Post接下来的 5 分钟没有日志。

之后我看到了下面的内容

[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] Attempt to heartbeat failed since group is rebalancing
[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] Member consumer-13-837563e4-49e9-4bd1-aee4-cb21263e176a sending LeaveGroup request to coordinator <broker-host-name> (id: 2147483646 rack: null)
[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

在此之后,主题中的消息堆积如山,我们注意到在没有消费者的情况下延迟明显增加。我们重新启动托管消费者的应用程序以重新开始消费。

我们可以做些什么来避免这种情况?应用程序消费者端是否应该做些什么来处理这个问题?

注意:对于这个特定的消费者,我们使用了 apache kafka client 库。我们通常使用 spring-kafka 库来构建我们的消费者。我们使用 apache one,因为我们想使用我们使用的 spring-kafka 版本不支持的 kafka 消费者的 pauseresume 功能

当您提到您正在 kafka 代理上进行活动时,即重新启动 VM(这些应该是 kafka 服务而不是容器的受控重新启动)。我的意思是,如果你想要在维护期间持续消耗,你必须考虑以下 -

  • 必须以 滚动重启方式click here for details
  • 拉下 Kafka 代理进行维护
  • 以上建议一次一个或取决于集群配置中的 ISR 设置
  • 分区数和复制因子必须大于 1 就好像代理停机进行维护一样,您的主题不应该有离线分区,这会导致 producer/consumer 失败,进而导致数据丢失

个人建议可以将controller拉下来作为滚动重启的最后一个,避免多次controller切换和重新加载集群元数据

当我们在维护 activity 后对每个代理进行滚动重启时,代理需要一些时间才能启动,即重新填充分区元数据和所有复制不足的分区所花费的时间到 return 到 0(这非常重要,以免多次重启给控制器带来压力,因为多个复制分区会导致 offline/unavailable 主题分区,具体取决于您的配置)

除上述之外,您绝对可以调整以下消费者配置 -

  • heartbeat.interval.ms - 必须低于 session.timeout.ms
  • session.timeout.ms
  • max.poll.interval.ms 以上可以根据您的连接延迟和 kafka 集群状态进行调整 您可以在 Confluent Docs
  • 上阅读更多关于它们的信息

也有可能在集群维护 activity 发生时,被指定为分区领导者的 broker 响应时间明显大于 session.timeout.ms max.poll,消费者停止重试。因此,调整消费者配置并在集群操作中保持理智是健康和持续的 kafka 集成的关键。*

注意 - 个人认为,已经完成吞吐量超过 1Gpbs 的集群 upgrades/maintenance 活动,我们不会面临消耗问题 (预计请求 handler/network 处理程序延迟会出现峰值因为重新平衡)。 谨记上述免责声明并谨慎执行,更新更容易但绝对耗时,因为它们将以时尚的方式连续执行

有关集群维护和消费者行为调整的文档的更多帮助 -

终于找到了问题的根源。 kafkaConsumer#commitSync 抛出未经检查的异常,TimeOutException,因为新组协调器在 commitSync 遇到错误时重试的一分钟内未完成加载偏移量。

我没有处理这个暂时性错误。使调试变得困难的原因是我从主线程中生成了线程的使用者。消费者线程中没有异常处理,我也没有检查主线程中的未来对象。结果 TimeOutException 也没有被记录。