java 中的 kafka 消费者客户端在升级所有 kafka-pods 后无法重新连接到 kubernetes kafka 代理
kafka consumer client in java can't reconnect to kubernetes kafka brokers after all of kafka-pods are upgraded
我使用 spring-kafka(2.2.4.RELEASE) 来消费来自 kafka-server 的消息。
kafka客户端和服务端都部署在k8s集群中。
正常情况下,在kafka broker上生产和消费消息是可以的。
但是升级kafka-brokers后,kafka客户端无法重新连接到broker。
据我所知,当bootstrap-servers
为虚拟ip(detail is here)时,kafka客户端重连有一个bug。我的问题和vip bug一样。
我的情况,bootstrap-servers
地址是k8skafka服务名:端口,升级kafka-brokers时,真实ip对应的kafka servcie name会变。
所以kafka客户端永远不会重连成功。
我该如何解决这个问题?
环境
- kubectl 版本
Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:19:54Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:11:51Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
- kafka版本:kafka_2.12-2.3.1
- kafka部署信息:
> kubectl get svc -o wide -nbingotestdev|grep kafkadev
kafkadev ClusterIP None <none> 9091/TCP 1y app=kafkadev
kafkadev-out NodePort 10.68.206.93 <none> 9091:37142/TCP 257d app=kafkadev
> kubectl get pod -o wide -nbingotestdev|grep kafkadev
kafkadev-0 1/1 Running 0 15h 172.20.10.59 10.171.113.45
kafkadev-1 1/1 Running 0 15h 172.20.13.95 10.171.113.33
kafkadev-2 1/1 Running 0 15h 172.20.2.173 10.171.113.62
- kafka客户端配置:
- 版本 1:
bootstrap-servers = kafkadev:9091
- 版本 2:
bootstrap-servers = 10.68.206.93:9091
- 当 kafka 服务器正常时都工作成功,并且在升级 kafka 服务器 pods 后重新连接失败。
您必须确保您始终拥有一个静态分配的 IP 集,当消费者获取 bootstrap 服务器时,无论是通过外部 DNS 服务还是使用 k8s,该 IP 集都会作为广告侦听器返回api 客户端直接检查 运行 Kafka 服务,然后获取所有地址以构建您的 bootstrap-server 字符串
我使用 spring-kafka(2.2.4.RELEASE) 来消费来自 kafka-server 的消息。 kafka客户端和服务端都部署在k8s集群中。 正常情况下,在kafka broker上生产和消费消息是可以的。 但是升级kafka-brokers后,kafka客户端无法重新连接到broker。
据我所知,当bootstrap-servers
为虚拟ip(detail is here)时,kafka客户端重连有一个bug。我的问题和vip bug一样。
我的情况,bootstrap-servers
地址是k8skafka服务名:端口,升级kafka-brokers时,真实ip对应的kafka servcie name会变。
所以kafka客户端永远不会重连成功。
我该如何解决这个问题?
环境
- kubectl 版本
Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:19:54Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:11:51Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
- kafka版本:kafka_2.12-2.3.1
- kafka部署信息:
> kubectl get svc -o wide -nbingotestdev|grep kafkadev
kafkadev ClusterIP None <none> 9091/TCP 1y app=kafkadev
kafkadev-out NodePort 10.68.206.93 <none> 9091:37142/TCP 257d app=kafkadev
> kubectl get pod -o wide -nbingotestdev|grep kafkadev
kafkadev-0 1/1 Running 0 15h 172.20.10.59 10.171.113.45
kafkadev-1 1/1 Running 0 15h 172.20.13.95 10.171.113.33
kafkadev-2 1/1 Running 0 15h 172.20.2.173 10.171.113.62
- kafka客户端配置:
- 版本 1:
bootstrap-servers = kafkadev:9091
- 版本 2:
bootstrap-servers = 10.68.206.93:9091
- 当 kafka 服务器正常时都工作成功,并且在升级 kafka 服务器 pods 后重新连接失败。
- 版本 1:
您必须确保您始终拥有一个静态分配的 IP 集,当消费者获取 bootstrap 服务器时,无论是通过外部 DNS 服务还是使用 k8s,该 IP 集都会作为广告侦听器返回api 客户端直接检查 运行 Kafka 服务,然后获取所有地址以构建您的 bootstrap-server 字符串