将 spark 流从本地机器连接到 GCP 上的 kafka

connecting spark streaming from local machine to kafka on GCP

我目前在 GCP 上有 4 个虚拟机。 1 是托管我的动物园管理员的地方,另外 3 个是名为 kafka-0、kafka-1、kafka-2 的 kafka-broker。当 VMS 中的 producing/consuming 消息时一切正常,但是当我尝试从本地计算机连接到 Kafka 时它开始失败。首先,我为端口 9092(在所有实例上)打开了防火墙规则。然后我为每个实例添加了一个静态外部 IP。我正在尝试从本地 Spark 流作业连接到我的 kafka 代理。

只是为了完整性检查,我的动物园管理员能够连接到所有代理

i.ie 这个 bash 命令 运行 在我的 zookeeper 上(10.150.0.6:2181 是 zookeeper 的内部 IP)

zookeeper-shell.sh 10.150.0.6:2181 ls /brokers/ids 

给我以下输出

Connecting to 10.150.0.6:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[0, 1, 2]

我的第一个方法。我尝试使用代理的外部 IP

连接到其中一个 kafka 代理
val df = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",
        "34.86.170.127:9092")
      .option("subscribe", KAFKA_TOPIC_NAME_CONS)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", false)
      .load()

这给了我以下错误

 java.net.UnknownHostException: kafka-0.us-west2-c.c.civic-animal-213016.internal: nodename nor servname provided, or not known

我觉得这个错误很有趣,因为它以某种方式计算出了我在 GCP 上的主机名,即使我只提供了它。(这可以从 zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0" 中得到确认)

所以我进一步研究了这个问题并发现了这个博客 post。 https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ 并了解了广告听众。然后我阅读了以下 Whosebug 交换。

第二种方法

我尝试了答案。所以我进入我的 kafka-0 代理实例和 运行 以下命令

vi kafka_2.12-2.0.0/config/server.properties

然后我取消注释广告听众并更改

#advertised.listeners=PLAINTEXT://localhost:9092

advertised.listeners=PLAINTEXT://[instance_public_id_address]:9092

所以这解决了

advertised.listeners=PLAINTEXT://kafka-0.us-west2-c.c.civic-animal-213016.internal:9092

然而,这仍然给我同样的错误

java.net.UnknownHostException: kafka-0.us-west2-c.c.civic-animal-213016.internal: nodename nor servname provided, or not known

所以尝试 3,我尝试更改而不是使用外部 IP 连接,我可以使用主机名代替

val df = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",
        "kafka-0.us-west2-c.c.civic-animal-213016.internal:9092")
      .option("subscribe", KAFKA_TOPIC_NAME_CONS)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", false)
      .load()

但我收到以下错误。

Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

我对如何解决这个问题感到困惑?任何帮助我走得更远的步骤

原来我对为 advertised.listeners=PLAINTEXT://[instance_public_id_address]:9092

输入什么感到困惑

我变了

advertised.listeners=PLAINTEXT://kafka-0.us-west2-c.c.civic-animal-213016.internal:9092

至此

advertised.listeners=PLAINTEXT://34.86.170.127:9092

其中 34.86.170.127:9092 是我在 VM 实例上的外部 IP。