如何使用 Kafka 数据源(例如 Confluent Cloud 身份验证)指定流式查询的 Kafka 自定义配置?
How to specify Kafka custom configuration of streaming queries with Kafka data source (e.g. Confluent Cloud authentication)?
我想使用针对 Confluent Cloud 的结构化流进行读写。问题是我在文档中找不到进行身份验证的方法。
我有下一个数据连接:
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-nq5ga.westeurope.azure.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
security.protocol=SASL_SSL
当我在没有密码的情况下针对 localhost 进行测试时,我没有遇到任何问题。
val inputStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", inputTopic)
.option("startingOffsets", startingOffsetsValue)
.load()
outputStream.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", outputBrokers)
.option("topic", outputTopic)
.option("checkpointLocation", pathCheckpoint)
.start()
.awaitTermination()
有人知道怎么通过鉴权配置才能上confluent云
引用官方文档Kafka Specific Configurations:
Kafka’s own configurations can be set via DataStreamReader.option with kafka.
prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port")
这样,我们就可以传递连接数据了,例如
.option("kafka.ssl.endpoint.identification.algorithm", "https")
我想使用针对 Confluent Cloud 的结构化流进行读写。问题是我在文档中找不到进行身份验证的方法。
我有下一个数据连接:
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-nq5ga.westeurope.azure.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
security.protocol=SASL_SSL
当我在没有密码的情况下针对 localhost 进行测试时,我没有遇到任何问题。
val inputStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", inputTopic)
.option("startingOffsets", startingOffsetsValue)
.load()
outputStream.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", outputBrokers)
.option("topic", outputTopic)
.option("checkpointLocation", pathCheckpoint)
.start()
.awaitTermination()
有人知道怎么通过鉴权配置才能上confluent云
引用官方文档Kafka Specific Configurations:
Kafka’s own configurations can be set via DataStreamReader.option with
kafka.
prefix, e.g,stream.option("kafka.bootstrap.servers", "host:port")
这样,我们就可以传递连接数据了,例如
.option("kafka.ssl.endpoint.identification.algorithm", "https")