Spark Structured Streaming with Kafka SASL/PLAIN 身份验证
Spark Structured Streaming with Kafka SASL/PLAIN authentication
有没有办法将 Spark 结构化流作业连接到受 SASL/PLAIN 身份验证保护的 Kafka 集群?
我在想类似的事情:
val df2 = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=...")
.option("subscribe", "topic1")
.load();
似乎虽然 Spark Structured Streaming 识别 kafka.bootstrap.servers
选项,但它不识别其他与 SASL 相关的选项。有什么不同的方法吗?
这是 PySpark 中的完整示例。
对于 test/dev,您可以在选项中内联 JAAS 配置。
options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()
如果您在生产中使用此模式,您需要将 JAAS 配置放在一个文件中。为此,将确切的内容复制到名为 jaas.conf 的文件中并删除 jaas 密钥:
options = {
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()
然后提供 spark-submit 的文件路径。例如:
spark-submit \
--driver-java-options -Djava.security.auth.login.config=/path/to/jaas.conf \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 yourapp.py
您需要为您的应用程序选择正确的路径和版本。
有没有办法将 Spark 结构化流作业连接到受 SASL/PLAIN 身份验证保护的 Kafka 集群?
我在想类似的事情:
val df2 = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=...")
.option("subscribe", "topic1")
.load();
似乎虽然 Spark Structured Streaming 识别 kafka.bootstrap.servers
选项,但它不识别其他与 SASL 相关的选项。有什么不同的方法吗?
这是 PySpark 中的完整示例。
对于 test/dev,您可以在选项中内联 JAAS 配置。
options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()
如果您在生产中使用此模式,您需要将 JAAS 配置放在一个文件中。为此,将确切的内容复制到名为 jaas.conf 的文件中并删除 jaas 密钥:
options = {
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()
然后提供 spark-submit 的文件路径。例如:
spark-submit \
--driver-java-options -Djava.security.auth.login.config=/path/to/jaas.conf \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 yourapp.py
您需要为您的应用程序选择正确的路径和版本。