如何在Structured Streaming中为kafka数据源中的消费者组设置group.id?
How to set group.id for consumer group in kafka data source in Structured Streaming?
我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据。这意味着我将需要强制执行特定的 group.id。但是,正如文档中所述,这是不可能的。
不过,在 databricks 文档 https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 中,它说这是可能的。这是否仅指 azure 集群?
此外,通过查看 apache/spark 存储库 https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md 主分支的文档,我们可以了解到此类功能将在以后的 spark 版本中添加。你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id?
如果没有,是否有任何 Spark 2.4.0 能够设置特定消费者的解决方法group.id?
目前 (v2.4.0) 不可以。
您可以在 Apache Spark 项目中检查以下行:
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 - 在用于创建 KafkaConsumer
的属性中设置它
在 master 分支 你可以找到修改,可以设置 prefix 或特定的 group.id
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 - 根据组前缀(groupidprefix
)生成group.id
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 - 设置之前生成的 groupId,如果 kafka.group.id
没有在属性中传递
Structured Streaming guide 似乎很明确:
Note that the following Kafka params cannot be set and the Kafka
source or sink will throw an exception:
group.id: Kafka source will create a unique group id for each query
automatically.
auto.offset.reset: Set the source option
startingOffsets to specify where to start instead.
现在spark3.0可以指定group.id for kafka https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations
从 Spark 3.0.0 开始
根据 Structured Kafka Integration Guide,您可以提供 ConsumerGroup 作为选项 kafka.group.id
:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("kafka.group.id", "myConsumerGroup")
.load()
但是,Spark 不会提交任何偏移量,因此您的 ConsumerGroups 的偏移量不会存储在 Kafka 的内部主题 __consumer_offsets 中,而是存储在 Spark 的检查点文件中。
能够设置 group.id
是为了处理 Kafka 的最新功能 Authorization using Role-Based Access Control,为此您的 ConsumerGroup 通常需要遵循命名约定。
讨论并解决了 Spark 3.x 应用程序设置 kafka.group.id
的完整示例 。
我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据。这意味着我将需要强制执行特定的 group.id。但是,正如文档中所述,这是不可能的。 不过,在 databricks 文档 https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 中,它说这是可能的。这是否仅指 azure 集群?
此外,通过查看 apache/spark 存储库 https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md 主分支的文档,我们可以了解到此类功能将在以后的 spark 版本中添加。你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id?
如果没有,是否有任何 Spark 2.4.0 能够设置特定消费者的解决方法group.id?
目前 (v2.4.0) 不可以。
您可以在 Apache Spark 项目中检查以下行:
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 - 在用于创建 KafkaConsumer
在 master 分支 你可以找到修改,可以设置 prefix 或特定的 group.id
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 - 根据组前缀(groupidprefix
)生成group.id
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 - 设置之前生成的 groupId,如果 kafka.group.id
没有在属性中传递
Structured Streaming guide 似乎很明确:
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
group.id: Kafka source will create a unique group id for each query automatically.
auto.offset.reset: Set the source option startingOffsets to specify where to start instead.
现在spark3.0可以指定group.id for kafka https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations
从 Spark 3.0.0 开始
根据 Structured Kafka Integration Guide,您可以提供 ConsumerGroup 作为选项 kafka.group.id
:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("kafka.group.id", "myConsumerGroup")
.load()
但是,Spark 不会提交任何偏移量,因此您的 ConsumerGroups 的偏移量不会存储在 Kafka 的内部主题 __consumer_offsets 中,而是存储在 Spark 的检查点文件中。
能够设置 group.id
是为了处理 Kafka 的最新功能 Authorization using Role-Based Access Control,为此您的 ConsumerGroup 通常需要遵循命名约定。
讨论并解决了 Spark 3.x 应用程序设置 kafka.group.id
的完整示例