如何找到 spark 结构化流应用程序的消费者组 ID?
How can I find the consumer group id of a spark structrued streaming application?
在spark streaming编程中,我们可以通过如下配置显式分配kafka消费者组id:
val kafkaParams = Map[String, Object](
...
"group.id" -> "use_a_separate_group_id_for_each_stream",
...
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
在 spark structured streaming 中禁止设置你自己的消费者组 id。
在我的程序中,我不会更改使用者组 ID,而是重复使用该组 ID 来为该组订阅的主题分区寻找最新的偏移量。
那么有什么方法可以让我在我的 spark 应用程序中隐式使用消费者组 ID 吗?
消费者组 ID 由 Apache Spark 在创建 rdd 时在内部生成:
// So that consumers in executors do not mess with any existing group id
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
您可以在 KafkaSourceProvider.scala
上查看正在生成的内容。我不建议更改它,但可能会向下游流向其他静态消费者组
在spark streaming编程中,我们可以通过如下配置显式分配kafka消费者组id:
val kafkaParams = Map[String, Object](
...
"group.id" -> "use_a_separate_group_id_for_each_stream",
...
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
在 spark structured streaming 中禁止设置你自己的消费者组 id。
在我的程序中,我不会更改使用者组 ID,而是重复使用该组 ID 来为该组订阅的主题分区寻找最新的偏移量。
那么有什么方法可以让我在我的 spark 应用程序中隐式使用消费者组 ID 吗?
消费者组 ID 由 Apache Spark 在创建 rdd 时在内部生成:
// So that consumers in executors do not mess with any existing group id
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
您可以在 KafkaSourceProvider.scala
上查看正在生成的内容。我不建议更改它,但可能会向下游流向其他静态消费者组