kafka中的默认消费者组ID

default consumer group id in kafka

我正在使用 Kafka 2.11 并且对它相当陌生。我试图了解 kafka 消费者群体,我有 3 个 spark 应用程序从同一主题消费,并且每个应用程序都接收来自该主题的所有消息。由于我没有在应用程序中提到任何消费者组 ID,我假设 Kafka 正在为每个消费者组分配一些不同的消费者组 ID。 我需要使用以下 command.As 为其中一个应用程序重置 kafka 偏移量我不知道我的应用程序的消费者组名称我有点卡在这里。我是否需要在应用程序中显式分配组 ID,然后在下面的命令中使用它?

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-datetime 2017-11-1907:52:43:00:000 --group <group_name> --topic <topic_name> --execute

如果这是真的,我如何获得每个应用程序的消费者组 ID?我不能

消费者 group.id 是强制性的。如果你不设置 consumer group.id,你会得到异常。所以很明显,您将它设置在代码中的某处,或者您正在使用的框架或库在内部设置它。您应该始终自己设置 group.id

您可以使用以下命令获取消费者组ID:

bin/kafka-consumer-groups.sh  --list --bootstrap-server <kafka-broker-ip>:9092

As i have not mentioned any consumer group id in applications I'm assuming that Kafka is assigning some distinct consumer group id to each of them

Kafka 代理不会将消费者组名称分配给连接到它们的消费者。 当消费者连接并订阅一个主题时,它 "joins" 一个组。 如果您在未指定任何消费者组的情况下使用 Spark 应用程序,则意味着在某种程度上,您用于从 Spark 应用程序连接到 Kafka 的 library/framework 正在自行分配消费者组名称。

如果你去Spark代码可以找到KafkaSourceProvider class,它负责Kafka源reader,你可以看到生成了随机group.id:

private[kafka010] class KafkaSourceProvider extends DataSourceRegister

  override def createSource(
    sqlContext: SQLContext,
    metadataPath: String,
    schema: Option[StructType],
    providerName: String,
    parameters: Map[String, String]): Source = {
      validateStreamOptions(parameters)
      // Each running query should use its own group id. Otherwise, the query may be only assigned
      // partial data since Kafka will assign partitions to multiple consumers having the same group
      // id. Hence, we should generate a unique id for each query.
      val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    ...
  }

您可以使用 spark-kafka-source 前缀搜索 group.id,但找不到特定组的 group.id。

要查找所有消费者组 ID,您可以使用以下命令: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --list

要检查消费者组偏移量,您可以使用以下命令: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --group=GROUP_ID --describe