如何在Structured Streaming中为kafka数据源中的消费者组设置group.id?

How to set group.id for consumer group in kafka data source in Structured Streaming?

我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据。这意味着我将需要强制执行特定的 group.id。但是,正如文档中所述,这是不可能的。 不过,在 databricks 文档 https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 中,它说这是可能的。这是否仅指 azure 集群?

此外,通过查看 apache/spark 存储库 https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md 主分支的文档,我们可以了解到此类功能将在以后的 spark 版本中添加。你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id?

如果没有,是否有任何 Spark 2.4.0 能够设置特定消费者的解决方法group.id?

目前 (v2.4.0) 不可以。

您可以在 Apache Spark 项目中检查以下行:

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L81 - 生成group.id

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 - 在用于创建 KafkaConsumer

的属性中设置它

在 master 分支 你可以找到修改,可以设置 prefix 或特定的 group.id

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 - 根据组前缀(groupidprefix)生成group.id

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 - 设置之前生成的 groupId,如果 kafka.group.id 没有在属性中传递

Structured Streaming guide 似乎很明确:

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

group.id: Kafka source will create a unique group id for each query automatically.

auto.offset.reset: Set the source option startingOffsets to specify where to start instead.

现在spark3.0可以指定group.id for kafka https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations

从 Spark 3.0.0 开始

根据 Structured Kafka Integration Guide,您可以提供 ConsumerGroup 作为选项 kafka.group.id:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "myConsumerGroup")
  .load()

但是,Spark 不会提交任何偏移量,因此您的 ConsumerGroups 的偏移量不会存储在 Kafka 的内部主题 __consumer_offsets 中,而是存储在 Spark 的检查点文件中。

能够设置 group.id 是为了处理 Kafka 的最新功能 Authorization using Role-Based Access Control,为此您的 ConsumerGroup 通常需要遵循命名约定。

讨论并解决了 Spark 3.x 应用程序设置 kafka.group.id 的完整示例