Kafka NodePort 服务在集群外无法访问

Kafka NodePort service is unreachable outside of cluster

我一直在尝试使用 Helm charts 部署 Kafka。所以我为 Kafka pods 定义了 NodePort 服务。我用相同的主机和端口检查了控制台 Kafka 生产者和消费者——它们工作正常。但是,当我将 Spark 应用程序创建为数据消费者并将 Kafka 创建为生产者时,它们无法连接到 Kafka service0。我使用 minikube ip(而不是节点 ip)作为主机和服务 NodePort 端口。 虽然,在 Spark 日志中,我看到 NodePort 服务解析端点和代理被发现为 pods 地址和端口:

INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Discovered group coordinator 172.17.0.20:9092 (id: 2147483645 rack: null)
INFO ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Revoking previously assigned partitions []
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] (Re-)joining group
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2147483645 (/172.17.0.20:9092) could not be established. Broker may not be available.
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Group coordinator 172.17.0.20:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2 (/172.17.0.20:9092) could not be established. Broker may not be available.
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 0 (/172.17.0.12:9092) could not be established. Broker may not be available.

如何改变这种行为?

NodePort 服务定义如下所示:

kind: Service
apiVersion: v1
metadata:
  name: kafka-service
spec:
  selector:
    app: cp-kafka
    release: my-confluent-oss
  ports:
    - protocol: TCP
      targetPort: 9092
      port: 32400
      nodePort: 32400
  type: NodePort

Spark 消费者配置:

def kafkaParams() = Map[String, Object](
  "bootstrap.servers" -> "192.168.99.100:32400",
  "schema.registry.url" -> "http://192.168.99.100:8081",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[KafkaAvroDeserializer],
  "group.id" -> "avro_data",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

Kafka 生产者配置:

  props.put("bootstrap.servers", "192.168.99.100:32400")
  props.put("client.id", "avro_data")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("schema.registry.url", "http://192.168.99.100:32500")

Kafka 的所有 K8s 服务:

NAME                                     TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
kafka-service                            NodePort       10.99.113.234    <none>        32400:32400/TCP     6m34s
kubernetes                               ClusterIP      10.96.0.1        <none>        443/TCP             27d
my-confluent-oss-cp-kafka                ClusterIP      10.100.156.108   <none>        9092/TCP            102m
my-confluent-oss-cp-kafka-connect        ClusterIP      10.99.78.89      <none>        8083/TCP            102m
my-confluent-oss-cp-kafka-headless       ClusterIP      None             <none>        9092/TCP            102m
my-confluent-oss-cp-kafka-rest           ClusterIP      10.100.152.109   <none>        8082/TCP            102m
my-confluent-oss-cp-ksql-server          ClusterIP      10.96.249.202    <none>        8088/TCP            102m
my-confluent-oss-cp-schema-registry      ClusterIP      10.109.27.45     <none>        8081/TCP            102m
my-confluent-oss-cp-zookeeper            ClusterIP      10.102.182.90    <none>        2181/TCP            102m
my-confluent-oss-cp-zookeeper-headless   ClusterIP      None             <none>        2888/TCP,3888/TCP   102m
schema-registry-service                  NodePort       10.103.100.64    <none>        32500:32500/TCP     33m
zookeeper-np                             NodePort       10.98.180.130    <none>        32181:32181/TCP     53m

我在尝试从外部访问 minikube 上的 kafka 代理 (cp-helm-chart) 运行 时遇到了类似的问题。

这是我如何解决的。在使用 helm install 从本地存储库安装之前。

  1. 在此文件中编辑 https://github.com/confluentinc/cp-helm-charts/blob/master/charts/cp-kafka/values.yaml
  2. 搜索 nodeport: 并将其 enabled 字段更改为 true。
    节点端口:
    启用:真
  3. 通过删除 #:
    取消对这两行的注释 "advertised.listeners":|-
    外部://${HOST_IP}:$((31090 + ${KAFKA_BROKER_ID}))
  4. 将 ${HOST_IP} 替换为您的 minikube ip(在 cmd 中输入 minikube ip 以检索您的 k8s 主机 ip,例如:196.169.99.100)
  5. 用经纪人 ID 替换 ${KAFKA_BROKER_ID}(如果只有一个经纪人是 运行,那么默认情况下它只是 0)
  6. 最后看起来像这样:
    "advertised.listeners":|-
    外部://196.169.99.100:31090

现在您可以通过将 bootstrap.servers 指向 196.169.99.100:31090

从外部访问 k8s 集群中的 kafka 代理 运行