java 中的 kafka 消费者客户端在升级所有 kafka-pods 后无法重新连接到 kubernetes kafka 代理

kafka consumer client in java can't reconnect to kubernetes kafka brokers after all of kafka-pods are upgraded

我使用 spring-kafka(2.2.4.RELEASE) 来消费来自 kafka-server 的消息。 kafka客户端和服务端都部署在k8s集群中。 正常情况下,在kafka broker上生产和消费消息是可以的。 但是升级kafka-brokers后,kafka客户端无法重新连接到broker。

据我所知,当bootstrap-servers为虚拟ip(detail is here)时,kafka客户端重连有一个bug。我的问题和vip bug一样。

我的情况,bootstrap-servers地址是k8skafka服务名:端口,升级kafka-brokers时,真实ip对应的kafka servcie name会变。 所以kafka客户端永远不会重连成功。 我该如何解决这个问题?

环境

Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:19:54Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:11:51Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
> kubectl get svc -o wide -nbingotestdev|grep kafkadev
kafkadev                ClusterIP   None            <none>        9091/TCP                          1y        app=kafkadev
kafkadev-out            NodePort    10.68.206.93    <none>        9091:37142/TCP                    257d      app=kafkadev

> kubectl get pod -o wide -nbingotestdev|grep kafkadev
kafkadev-0                               1/1       Running             0          15h       172.20.10.59    10.171.113.45
kafkadev-1                               1/1       Running             0          15h       172.20.13.95    10.171.113.33
kafkadev-2                               1/1       Running             0          15h       172.20.2.173    10.171.113.62

您必须确保您始终拥有一个静态分配的 IP 集,当消费者获取 bootstrap 服务器时,无论是通过外部 DNS 服务还是使用 k8s,该 IP 集都会作为广告侦听器返回api 客户端直接检查 运行 Kafka 服务,然后获取所有地址以构建您的 bootstrap-server 字符串