Spark structured streaming 无权访问组
Spark structured streaming Not authorized to access group
我正在尝试通过 spark 结构化流从 Kafka 读取数据。但是,在 Spark 2.4.0. 中,您无法为流设置组 ID(参见 )。
但是,由于未设置,spark 仅生成组 ID,我被困在 GroupAuthorizationException:
19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-120f-4a6d-b20e-634eb77ac7b8, runId = 63aa4cce-ad72-47f2-80f6-e87947b69685] terminated with error
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-d2420426-13d5-4bda-ad21-7d8e43ebf518-1874352823-driver-2
有什么办法可以绕过这个吗?有趣的是,我可以通过 kafka-console-consumer.sh 读取这些数据,我可以在 .properties 文件中传递组 ID。
抛出异常的代码:
val df = spark
.readStream
.format("kafka")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("kafka.group.id", "idThatShouldBeUsed")
.option("kafka.bootstrap.servers", "server")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.truststore.location", "/location)
.option("kafka.ssl.truststore.password", "pass")
.option("kafka.sasl.jaas.config", """jaasToUse""")
.load()
.writeStream
.outputMode("append")
.format("console")
.option("startingOffsets", "earliest")
.start().awaitTermination()
从消费者的角度来看,这似乎是无法解决的。我们最终不得不使用 bin/kafka-acls.sh 和通配符来允许结构化流生成的所有组 ID。
kafka acl 示例:
bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zk:2181 --add --allow-principal User:'Bon' --operation READ --topic topicName --group='spark-kafka-source-' --resource-pattern-type prefixed
我正在尝试通过 spark 结构化流从 Kafka 读取数据。但是,在 Spark 2.4.0. 中,您无法为流设置组 ID(参见
但是,由于未设置,spark 仅生成组 ID,我被困在 GroupAuthorizationException:
19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-120f-4a6d-b20e-634eb77ac7b8, runId = 63aa4cce-ad72-47f2-80f6-e87947b69685] terminated with error
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-d2420426-13d5-4bda-ad21-7d8e43ebf518-1874352823-driver-2
有什么办法可以绕过这个吗?有趣的是,我可以通过 kafka-console-consumer.sh 读取这些数据,我可以在 .properties 文件中传递组 ID。
抛出异常的代码:
val df = spark
.readStream
.format("kafka")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("kafka.group.id", "idThatShouldBeUsed")
.option("kafka.bootstrap.servers", "server")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.truststore.location", "/location)
.option("kafka.ssl.truststore.password", "pass")
.option("kafka.sasl.jaas.config", """jaasToUse""")
.load()
.writeStream
.outputMode("append")
.format("console")
.option("startingOffsets", "earliest")
.start().awaitTermination()
从消费者的角度来看,这似乎是无法解决的。我们最终不得不使用 bin/kafka-acls.sh 和通配符来允许结构化流生成的所有组 ID。
kafka acl 示例:
bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zk:2181 --add --allow-principal User:'Bon' --operation READ --topic topicName --group='spark-kafka-source-' --resource-pattern-type prefixed