将 spark 流从本地机器连接到 GCP 上的 kafka
connecting spark streaming from local machine to kafka on GCP
我目前在 GCP 上有 4 个虚拟机。 1 是托管我的动物园管理员的地方,另外 3 个是名为 kafka-0、kafka-1、kafka-2 的 kafka-broker。当 VMS 中的 producing/consuming 消息时一切正常,但是当我尝试从本地计算机连接到 Kafka 时它开始失败。首先,我为端口 9092(在所有实例上)打开了防火墙规则。然后我为每个实例添加了一个静态外部 IP。我正在尝试从本地 Spark 流作业连接到我的 kafka 代理。
只是为了完整性检查,我的动物园管理员能够连接到所有代理
i.ie 这个 bash 命令 运行 在我的 zookeeper 上(10.150.0.6:2181 是 zookeeper 的内部 IP)
zookeeper-shell.sh 10.150.0.6:2181 ls /brokers/ids
给我以下输出
Connecting to 10.150.0.6:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[0, 1, 2]
我的第一个方法。我尝试使用代理的外部 IP
连接到其中一个 kafka 代理
val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"34.86.170.127:9092")
.option("subscribe", KAFKA_TOPIC_NAME_CONS)
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
这给了我以下错误
java.net.UnknownHostException: kafka-0.us-west2-c.c.civic-animal-213016.internal: nodename nor servname provided, or not known
我觉得这个错误很有趣,因为它以某种方式计算出了我在 GCP 上的主机名,即使我只提供了它。(这可以从 zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0
" 中得到确认)
所以我进一步研究了这个问题并发现了这个博客 post。
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ 并了解了广告听众。然后我阅读了以下 Whosebug 交换。
第二种方法
我尝试了答案。所以我进入我的 kafka-0 代理实例和 运行 以下命令
vi kafka_2.12-2.0.0/config/server.properties
然后我取消注释广告听众并更改
#advertised.listeners=PLAINTEXT://localhost:9092
到
advertised.listeners=PLAINTEXT://[instance_public_id_address]:9092
所以这解决了
advertised.listeners=PLAINTEXT://kafka-0.us-west2-c.c.civic-animal-213016.internal:9092
然而,这仍然给我同样的错误
java.net.UnknownHostException: kafka-0.us-west2-c.c.civic-animal-213016.internal: nodename nor servname provided, or not known
所以尝试 3,我尝试更改而不是使用外部 IP 连接,我可以使用主机名代替
val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"kafka-0.us-west2-c.c.civic-animal-213016.internal:9092")
.option("subscribe", KAFKA_TOPIC_NAME_CONS)
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
但我收到以下错误。
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
我对如何解决这个问题感到困惑?任何帮助我走得更远的步骤
原来我对为 advertised.listeners=PLAINTEXT://[instance_public_id_address]:9092
输入什么感到困惑
我变了
advertised.listeners=PLAINTEXT://kafka-0.us-west2-c.c.civic-animal-213016.internal:9092
至此
advertised.listeners=PLAINTEXT://34.86.170.127:9092
其中 34.86.170.127:9092 是我在 VM 实例上的外部 IP。
我目前在 GCP 上有 4 个虚拟机。 1 是托管我的动物园管理员的地方,另外 3 个是名为 kafka-0、kafka-1、kafka-2 的 kafka-broker。当 VMS 中的 producing/consuming 消息时一切正常,但是当我尝试从本地计算机连接到 Kafka 时它开始失败。首先,我为端口 9092(在所有实例上)打开了防火墙规则。然后我为每个实例添加了一个静态外部 IP。我正在尝试从本地 Spark 流作业连接到我的 kafka 代理。
只是为了完整性检查,我的动物园管理员能够连接到所有代理
i.ie 这个 bash 命令 运行 在我的 zookeeper 上(10.150.0.6:2181 是 zookeeper 的内部 IP)
zookeeper-shell.sh 10.150.0.6:2181 ls /brokers/ids
给我以下输出
Connecting to 10.150.0.6:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[0, 1, 2]
我的第一个方法。我尝试使用代理的外部 IP
连接到其中一个 kafka 代理val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"34.86.170.127:9092")
.option("subscribe", KAFKA_TOPIC_NAME_CONS)
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
这给了我以下错误
java.net.UnknownHostException: kafka-0.us-west2-c.c.civic-animal-213016.internal: nodename nor servname provided, or not known
我觉得这个错误很有趣,因为它以某种方式计算出了我在 GCP 上的主机名,即使我只提供了它。(这可以从 zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0
" 中得到确认)
所以我进一步研究了这个问题并发现了这个博客 post。 https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ 并了解了广告听众。然后我阅读了以下 Whosebug 交换。
第二种方法
我尝试了答案。所以我进入我的 kafka-0 代理实例和 运行 以下命令
vi kafka_2.12-2.0.0/config/server.properties
然后我取消注释广告听众并更改
#advertised.listeners=PLAINTEXT://localhost:9092
到
advertised.listeners=PLAINTEXT://[instance_public_id_address]:9092
所以这解决了
advertised.listeners=PLAINTEXT://kafka-0.us-west2-c.c.civic-animal-213016.internal:9092
然而,这仍然给我同样的错误
java.net.UnknownHostException: kafka-0.us-west2-c.c.civic-animal-213016.internal: nodename nor servname provided, or not known
所以尝试 3,我尝试更改而不是使用外部 IP 连接,我可以使用主机名代替
val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"kafka-0.us-west2-c.c.civic-animal-213016.internal:9092")
.option("subscribe", KAFKA_TOPIC_NAME_CONS)
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
但我收到以下错误。
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
我对如何解决这个问题感到困惑?任何帮助我走得更远的步骤
原来我对为 advertised.listeners=PLAINTEXT://[instance_public_id_address]:9092
输入什么感到困惑我变了
advertised.listeners=PLAINTEXT://kafka-0.us-west2-c.c.civic-animal-213016.internal:9092
至此
advertised.listeners=PLAINTEXT://34.86.170.127:9092
其中 34.86.170.127:9092 是我在 VM 实例上的外部 IP。