如何将 Spark Streaming 与 Kafka 和 Kerberos 一起使用?
How to use Spark Streaming with Kafka with Kerberos?
我在使用 Kerberized Hadoop 集群中的 Spark Streaming 应用程序尝试使用来自 Kafka 的消息时遇到了一些问题。我尝试了这两种方法 listed here :
- 基于接收者的方法:
KafkaUtils.createStream
- 直接接近(无接收器):
KafkaUtils.createDirectStream
基于接收者的方法(KafkaUtils.createStream
)抛出两种类型的异常(不同的异常,无论我是在本地模式(--master local[*]
)还是在 YARN 模式(--master yarn --deploy-mode client
):
- Spark 本地应用程序中的怪异
kafka.common.BrokerEndPointNotAvailableException
- Spark on YARN 应用程序中的 Zookeeper 超时。我曾经设法完成这项工作(成功连接到 Zookeeper),但没有收到任何消息
在两种模式(本地或 YARN)中,直接方法 (KafkaUtils.createDirectStream
) returns 无法解释的 EOFException
(请参阅下面的详细信息)。
我的最终目标是在 YARN 上启动一个 Spark Streaming 作业,所以我将把 Spark 本地作业放在一边。
这是我的测试环境:
- Cloudera CDH 5.7.0
- Spark 1.6.0
- 卡夫卡 0.10.1.0
出于测试目的,我正在使用单节点集群(主机名 = quickstart.cloudera
)。对于那些有兴趣重现测试的人,我正在开发一个基于 cloudera/quickstart
(Git repo).
的自定义 Docker 容器
下面是我在 spark-shell
中使用的示例代码。当然,此代码在未启用 Kerberos 时有效:kafka-console-producer
生成的消息由 Spark 应用程序接收。
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder
val ssc = new StreamingContext(sc, Seconds(5))
val topics = Map("test-kafka" -> 1)
def readFromKafkaReceiver(): Unit = {
val kafkaParams = Map(
"zookeeper.connect" -> "quickstart.cloudera:2181",
"group.id" -> "gid1",
"client.id" -> "cid1",
"zookeeper.session.timeout.ms" -> "5000",
"zookeeper.connection.timeout.ms" -> "5000"
)
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
stream.print
}
def readFromKafkaDirectStream(): Unit = {
val kafkaDirectParams = Map(
"bootstrap.servers" -> "quickstart.cloudera:9092",
"group.id" -> "gid1",
"client.id" -> "cid1"
)
val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
directStream.print
}
readFromKafkaReceiver() // or readFromKafkaDirectStream()
ssc.start
Thread.sleep(20000)
ssc.stop(stopSparkContext = false, stopGracefully = true)
启用 Kerberos 后,此代码不起作用。我遵循了本指南:Configuring Kafka Security,并创建了两个配置文件:
jaas.conf
:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
client.properties
:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
我可以生成消息:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-producer \
--broker-list quickstart.cloudera:9092 \
--topic test-kafka \
--producer.config client.properties
但我无法使用来自 Spark Streaming 应用程序的这些消息。要在 yarn-client
模式下启动 spark-shell
,我刚刚创建了一个新的 JAAS 配置(jaas_with_zk_yarn.conf
),其中包含一个 Zookeeper 部分(Client
),并且引用了 keytab只有文件名(密钥表然后通过 --keytab
选项传递):
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
此新文件在 --files
选项中传递:
spark-shell --master yarn --deploy-mode client \
--num-executors 2 \
--files /home/simpleuser/jaas_with_zk_yarn.conf \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
--keytab /home/simpleuser/simpleuser.keytab \
--principal simpleuser
我使用了与之前相同的代码,除了我添加了另外两个 Kafka 参数,对应于 consumer.properties
文件的内容:
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka"
readFromKafkaReceiver()
启动 Spark Streaming Context 后抛出以下错误(无法连接到 Zookeeper):
ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:2003)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:2003)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
有时会建立与 ZK 的连接(未达到超时),但随后收不到任何消息。
readFromKafkaDirectStream()
一旦调用此方法 就会抛出以下错误 :
org.apache.spark.SparkException: java.io.EOFException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)
没有更多的解释,只是一个EOFException
。我认为 Spark 和 Kafka broker 之间存在通信问题,但没有更多解释。我也试过 metadata.broker.list
而不是 bootstrap.servers
,但没有成功。
也许我在 JAAS 配置文件或 Kafka 参数中遗漏了什么?也许 Spark 选项 (extraJavaOptions
) 无效?我尝试了很多可能性我有点迷路了。
如果有人可以帮助我解决至少一个这些问题(直接方法或基于接收器),我将很高兴。谢谢:)
Spark 1.6 不支持它,如 Cloudera 文档中所述:
Spark Streaming cannot consume from secure Kafka till it starts using Kafka 0.9 Consumer API
1.6 中的 Spark-streaming 使用不支持安全消费的旧消费者 API。
您可以使用支持安全 Kafka 的 Spark 2.1:
https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/
我在使用 Kerberized Hadoop 集群中的 Spark Streaming 应用程序尝试使用来自 Kafka 的消息时遇到了一些问题。我尝试了这两种方法 listed here :
- 基于接收者的方法:
KafkaUtils.createStream
- 直接接近(无接收器):
KafkaUtils.createDirectStream
基于接收者的方法(KafkaUtils.createStream
)抛出两种类型的异常(不同的异常,无论我是在本地模式(--master local[*]
)还是在 YARN 模式(--master yarn --deploy-mode client
):
- Spark 本地应用程序中的怪异
kafka.common.BrokerEndPointNotAvailableException
- Spark on YARN 应用程序中的 Zookeeper 超时。我曾经设法完成这项工作(成功连接到 Zookeeper),但没有收到任何消息
在两种模式(本地或 YARN)中,直接方法 (KafkaUtils.createDirectStream
) returns 无法解释的 EOFException
(请参阅下面的详细信息)。
我的最终目标是在 YARN 上启动一个 Spark Streaming 作业,所以我将把 Spark 本地作业放在一边。
这是我的测试环境:
- Cloudera CDH 5.7.0
- Spark 1.6.0
- 卡夫卡 0.10.1.0
出于测试目的,我正在使用单节点集群(主机名 = quickstart.cloudera
)。对于那些有兴趣重现测试的人,我正在开发一个基于 cloudera/quickstart
(Git repo).
下面是我在 spark-shell
中使用的示例代码。当然,此代码在未启用 Kerberos 时有效:kafka-console-producer
生成的消息由 Spark 应用程序接收。
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder
val ssc = new StreamingContext(sc, Seconds(5))
val topics = Map("test-kafka" -> 1)
def readFromKafkaReceiver(): Unit = {
val kafkaParams = Map(
"zookeeper.connect" -> "quickstart.cloudera:2181",
"group.id" -> "gid1",
"client.id" -> "cid1",
"zookeeper.session.timeout.ms" -> "5000",
"zookeeper.connection.timeout.ms" -> "5000"
)
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
stream.print
}
def readFromKafkaDirectStream(): Unit = {
val kafkaDirectParams = Map(
"bootstrap.servers" -> "quickstart.cloudera:9092",
"group.id" -> "gid1",
"client.id" -> "cid1"
)
val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
directStream.print
}
readFromKafkaReceiver() // or readFromKafkaDirectStream()
ssc.start
Thread.sleep(20000)
ssc.stop(stopSparkContext = false, stopGracefully = true)
启用 Kerberos 后,此代码不起作用。我遵循了本指南:Configuring Kafka Security,并创建了两个配置文件:
jaas.conf
:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
client.properties
:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
我可以生成消息:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-producer \
--broker-list quickstart.cloudera:9092 \
--topic test-kafka \
--producer.config client.properties
但我无法使用来自 Spark Streaming 应用程序的这些消息。要在 yarn-client
模式下启动 spark-shell
,我刚刚创建了一个新的 JAAS 配置(jaas_with_zk_yarn.conf
),其中包含一个 Zookeeper 部分(Client
),并且引用了 keytab只有文件名(密钥表然后通过 --keytab
选项传递):
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
此新文件在 --files
选项中传递:
spark-shell --master yarn --deploy-mode client \
--num-executors 2 \
--files /home/simpleuser/jaas_with_zk_yarn.conf \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
--keytab /home/simpleuser/simpleuser.keytab \
--principal simpleuser
我使用了与之前相同的代码,除了我添加了另外两个 Kafka 参数,对应于 consumer.properties
文件的内容:
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka"
readFromKafkaReceiver()
启动 Spark Streaming Context 后抛出以下错误(无法连接到 Zookeeper):
ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:2003)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:2003)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
有时会建立与 ZK 的连接(未达到超时),但随后收不到任何消息。
readFromKafkaDirectStream()
一旦调用此方法 就会抛出以下错误 :
org.apache.spark.SparkException: java.io.EOFException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)
没有更多的解释,只是一个EOFException
。我认为 Spark 和 Kafka broker 之间存在通信问题,但没有更多解释。我也试过 metadata.broker.list
而不是 bootstrap.servers
,但没有成功。
也许我在 JAAS 配置文件或 Kafka 参数中遗漏了什么?也许 Spark 选项 (extraJavaOptions
) 无效?我尝试了很多可能性我有点迷路了。
如果有人可以帮助我解决至少一个这些问题(直接方法或基于接收器),我将很高兴。谢谢:)
Spark 1.6 不支持它,如 Cloudera 文档中所述:
Spark Streaming cannot consume from secure Kafka till it starts using Kafka 0.9 Consumer API
1.6 中的 Spark-streaming 使用不支持安全消费的旧消费者 API。
您可以使用支持安全 Kafka 的 Spark 2.1: https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/