如何知道流式查询用于Kafka数据源的Kafka消费者组名称?

How to know the name of Kafka consumer group that streaming query uses for Kafka data source?

我正在通过 spark 结构化流从 kafka 主题消费数据,该主题有 3 个分区。由于 Spark 结构化流不允许您显式提供 group.id 并为消费者分配一些随机 ID,我尝试使用下面的 kafka 命令检查消费者组 ID

./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list

output
 spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
 spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
 spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0

以下是我的问题

1) 为什么要创建3个消费组?是因为3个分区吗?

2) 有什么办法可以在spark应用程序中获取这些消费者组名称吗?

3) 即使我的 spark 应用程序仍然是 运行,一段时间后这些组名并没有出现在消费者组列表中。是不是所有的数据都被spark应用消耗掉了,那个kafka topic里面没有更多的数据了?

4)如果我对第3点的假设是正确的,那么如果有新数据到达,它会创建一个新的消费者组ID还是消费者组的名称保持不变?

下面是我的阅读流

  val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
 // .option("assign"," {\""+topic+"\":[0]}") 
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 60000)
  .load()

我在应用程序中有 3 个写入流,如下所示

  val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)") 
  val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")

//First stream
 val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
   df1.agg(min("offset"), max("offset"))
  .writeStream
  .foreach(writer)
  .outputMode("complete")
  .option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct

//Second stream
val checkpoint_loc2=  "/warehouse/test_duplicate/download/chk2" 
distDates.writeStream.foreach(writer1)
  .option("checkpointLocation", checkpoint_loc2).start() 

//Third stream
val kafkaOutput =result.writeStream
  .outputMode("append")
  .format("orc")
  .option("path",data_dir)
  .option("checkpointLocation", checkpoint_loc3)
  .start()

流式查询在代码中只使用了一次,没有连接。

执行计划

== Parsed Logical Plan ==
 StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

1) Why does it create 3 consumer groups? Is it because of 3 partitions?

当然不是。这只是一个巧合。您似乎已经 运行 申请了 3 次并且主题有 3 个分区。

让我们重新开始备份。

我删除了所有消费者组以确保我们重新开始。

$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0

$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.

$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.

$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
// nothing got printed out

我创建了一个包含 5 个分区的主题。

$ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
Created topic "jacek-five-partitions".

$ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
Topic:jacek-five-partitions PartitionCount:5    ReplicationFactor:1 Configs:
    Topic: jacek-five-partitions    Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 4    Leader: 0   Replicas: 0 Isr: 0

我使用的代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object SparkApp extends App {

  val spark = SparkSession.builder.master("local[*]").getOrCreate()
  import spark.implicits._
  val q = spark
    .readStream
    .format("kafka")
    .option("startingoffsets", "latest")
    .option("subscribe", "jacek-five-partitions")
    .option("kafka.bootstrap.servers", ":9092")
    .load
    .select($"value" cast "string")
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start
  q.awaitTermination()
}

当我 运行 上面的 Spark Structured Streaming 应用程序时,我只创建了一个消费者组。

$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0

这是有道理的,因为所有的 Spark 处理都应该使用与分区一样多的 Kafka 消费者,但无论消费者的数量如何,都应该只有一个消费者组(否则 Kafka 消费者将消费所有记录和会有重复)。


2) Is there any way I can get these consumer group names in spark application?

没有 public API 所以答案是否定的。

然而,您可以 "hack" Spark 并低于 public API 直到使用 this line:

的内部 Kafka 消费者
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

甚至 this line 更准确地说:

val kafkaOffsetReader = new KafkaOffsetReader(
  strategy(caseInsensitiveParams),
  kafkaParamsForDriver(specifiedKafkaParams),
  parameters,
  driverGroupIdPrefix = s"$uniqueGroupId-driver")

只要找到Kafka数据源的KafkaMicroBatchReader,为知道groupIdKafkaOffsetReader请求它。这似乎是可行的。


Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic?

这是否与 KIP-211: Revise Expiration Semantics of Consumer Group Offsets 有关:

The offset of a topic partition within a consumer group expires when the expiration timestamp associated with that partition is reached. This expiration timestamp is usually affected by the broker config offsets.retention.minutes, unless user overrides that default and uses a custom retention.


4) will it create new consumer group id if new data arrives or the name of consumer group will remain same?

将保持不变。

此外,当组中至少有一个消费者处于活动状态时,不得删除消费者组。

group.id: Kafka source 会自动为每个查询创建一个唯一的group id。 http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html