Kafka NodePort 服务在集群外无法访问
Kafka NodePort service is unreachable outside of cluster
我一直在尝试使用 Helm charts 部署 Kafka。所以我为 Kafka pods 定义了 NodePort 服务。我用相同的主机和端口检查了控制台 Kafka 生产者和消费者——它们工作正常。但是,当我将 Spark 应用程序创建为数据消费者并将 Kafka 创建为生产者时,它们无法连接到 Kafka service0。我使用 minikube ip(而不是节点 ip)作为主机和服务 NodePort 端口。
虽然,在 Spark 日志中,我看到 NodePort 服务解析端点和代理被发现为 pods 地址和端口:
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Discovered group coordinator 172.17.0.20:9092 (id: 2147483645 rack: null)
INFO ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Revoking previously assigned partitions []
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] (Re-)joining group
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2147483645 (/172.17.0.20:9092) could not be established. Broker may not be available.
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Group coordinator 172.17.0.20:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2 (/172.17.0.20:9092) could not be established. Broker may not be available.
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 0 (/172.17.0.12:9092) could not be established. Broker may not be available.
如何改变这种行为?
NodePort 服务定义如下所示:
kind: Service
apiVersion: v1
metadata:
name: kafka-service
spec:
selector:
app: cp-kafka
release: my-confluent-oss
ports:
- protocol: TCP
targetPort: 9092
port: 32400
nodePort: 32400
type: NodePort
Spark 消费者配置:
def kafkaParams() = Map[String, Object](
"bootstrap.servers" -> "192.168.99.100:32400",
"schema.registry.url" -> "http://192.168.99.100:8081",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "avro_data",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
Kafka 生产者配置:
props.put("bootstrap.servers", "192.168.99.100:32400")
props.put("client.id", "avro_data")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("schema.registry.url", "http://192.168.99.100:32500")
Kafka 的所有 K8s 服务:
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-service NodePort 10.99.113.234 <none> 32400:32400/TCP 6m34s
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 27d
my-confluent-oss-cp-kafka ClusterIP 10.100.156.108 <none> 9092/TCP 102m
my-confluent-oss-cp-kafka-connect ClusterIP 10.99.78.89 <none> 8083/TCP 102m
my-confluent-oss-cp-kafka-headless ClusterIP None <none> 9092/TCP 102m
my-confluent-oss-cp-kafka-rest ClusterIP 10.100.152.109 <none> 8082/TCP 102m
my-confluent-oss-cp-ksql-server ClusterIP 10.96.249.202 <none> 8088/TCP 102m
my-confluent-oss-cp-schema-registry ClusterIP 10.109.27.45 <none> 8081/TCP 102m
my-confluent-oss-cp-zookeeper ClusterIP 10.102.182.90 <none> 2181/TCP 102m
my-confluent-oss-cp-zookeeper-headless ClusterIP None <none> 2888/TCP,3888/TCP 102m
schema-registry-service NodePort 10.103.100.64 <none> 32500:32500/TCP 33m
zookeeper-np NodePort 10.98.180.130 <none> 32181:32181/TCP 53m
我在尝试从外部访问 minikube 上的 kafka 代理 (cp-helm-chart) 运行 时遇到了类似的问题。
这是我如何解决的。在使用 helm install 从本地存储库安装之前。
- 在此文件中编辑 https://github.com/confluentinc/cp-helm-charts/blob/master/charts/cp-kafka/values.yaml
- 搜索 nodeport: 并将其 enabled 字段更改为 true。
节点端口:
启用:真
- 通过删除 #:
取消对这两行的注释
"advertised.listeners":|-
外部://${HOST_IP}:$((31090 + ${KAFKA_BROKER_ID}))
- 将 ${HOST_IP} 替换为您的 minikube ip(在 cmd 中输入 minikube ip 以检索您的 k8s 主机 ip,例如:196.169.99.100)
- 用经纪人 ID 替换 ${KAFKA_BROKER_ID}(如果只有一个经纪人是 运行,那么默认情况下它只是 0)
- 最后看起来像这样:
"advertised.listeners":|-
外部://196.169.99.100:31090
现在您可以通过将 bootstrap.servers 指向 196.169.99.100:31090
从外部访问 k8s 集群中的 kafka 代理 运行
我一直在尝试使用 Helm charts 部署 Kafka。所以我为 Kafka pods 定义了 NodePort 服务。我用相同的主机和端口检查了控制台 Kafka 生产者和消费者——它们工作正常。但是,当我将 Spark 应用程序创建为数据消费者并将 Kafka 创建为生产者时,它们无法连接到 Kafka service0。我使用 minikube ip(而不是节点 ip)作为主机和服务 NodePort 端口。 虽然,在 Spark 日志中,我看到 NodePort 服务解析端点和代理被发现为 pods 地址和端口:
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Discovered group coordinator 172.17.0.20:9092 (id: 2147483645 rack: null)
INFO ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Revoking previously assigned partitions []
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] (Re-)joining group
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2147483645 (/172.17.0.20:9092) could not be established. Broker may not be available.
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Group coordinator 172.17.0.20:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2 (/172.17.0.20:9092) could not be established. Broker may not be available.
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 0 (/172.17.0.12:9092) could not be established. Broker may not be available.
如何改变这种行为?
NodePort 服务定义如下所示:
kind: Service
apiVersion: v1
metadata:
name: kafka-service
spec:
selector:
app: cp-kafka
release: my-confluent-oss
ports:
- protocol: TCP
targetPort: 9092
port: 32400
nodePort: 32400
type: NodePort
Spark 消费者配置:
def kafkaParams() = Map[String, Object](
"bootstrap.servers" -> "192.168.99.100:32400",
"schema.registry.url" -> "http://192.168.99.100:8081",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "avro_data",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
Kafka 生产者配置:
props.put("bootstrap.servers", "192.168.99.100:32400")
props.put("client.id", "avro_data")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("schema.registry.url", "http://192.168.99.100:32500")
Kafka 的所有 K8s 服务:
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-service NodePort 10.99.113.234 <none> 32400:32400/TCP 6m34s
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 27d
my-confluent-oss-cp-kafka ClusterIP 10.100.156.108 <none> 9092/TCP 102m
my-confluent-oss-cp-kafka-connect ClusterIP 10.99.78.89 <none> 8083/TCP 102m
my-confluent-oss-cp-kafka-headless ClusterIP None <none> 9092/TCP 102m
my-confluent-oss-cp-kafka-rest ClusterIP 10.100.152.109 <none> 8082/TCP 102m
my-confluent-oss-cp-ksql-server ClusterIP 10.96.249.202 <none> 8088/TCP 102m
my-confluent-oss-cp-schema-registry ClusterIP 10.109.27.45 <none> 8081/TCP 102m
my-confluent-oss-cp-zookeeper ClusterIP 10.102.182.90 <none> 2181/TCP 102m
my-confluent-oss-cp-zookeeper-headless ClusterIP None <none> 2888/TCP,3888/TCP 102m
schema-registry-service NodePort 10.103.100.64 <none> 32500:32500/TCP 33m
zookeeper-np NodePort 10.98.180.130 <none> 32181:32181/TCP 53m
我在尝试从外部访问 minikube 上的 kafka 代理 (cp-helm-chart) 运行 时遇到了类似的问题。
这是我如何解决的。在使用 helm install 从本地存储库安装之前。
- 在此文件中编辑 https://github.com/confluentinc/cp-helm-charts/blob/master/charts/cp-kafka/values.yaml
- 搜索 nodeport: 并将其 enabled 字段更改为 true。
节点端口:
启用:真 - 通过删除 #:
取消对这两行的注释 "advertised.listeners":|-
外部://${HOST_IP}:$((31090 + ${KAFKA_BROKER_ID})) - 将 ${HOST_IP} 替换为您的 minikube ip(在 cmd 中输入 minikube ip 以检索您的 k8s 主机 ip,例如:196.169.99.100)
- 用经纪人 ID 替换 ${KAFKA_BROKER_ID}(如果只有一个经纪人是 运行,那么默认情况下它只是 0)
- 最后看起来像这样:
"advertised.listeners":|-
外部://196.169.99.100:31090
现在您可以通过将 bootstrap.servers 指向 196.169.99.100:31090
从外部访问 k8s 集群中的 kafka 代理 运行