kafka中如何创建新的消费者组

How to create a new consumer group in kafka

我在本地使用 运行 kafka,遵循快速入门指南 here

的说明

然后我在 config/consumer.properties 中定义了我的消费者组配置,以便我的消费者可以从定义的 group.id

中选择消息

运行以下命令,

bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

结果,

test-consumer-group  <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh

我能够通过基于 python 的消费者连接到 kafka,该消费者被配置为使用提供 group.idtest-consumer-group

首先,我无法理解how/whenkafka创建消费组。似乎它在某个时间点加载了 conf/consumer.properties,另外它在通过 kafka-console-consumer.sh.

连接时隐式创建了消费者组(在我的例子中是 console-consumer-67807

我怎样才能明确地创建我自己的消费者组,比方说 my-created-consumer-group

您不会显式创建消费者组,而是构建始终属于某个消费者组的消费者。无论您使用哪种技术(Spark、Spring、Flink 等),每个 Kafka 消费者都会有一个消费者组。消费者组可以为每个人配置消费者。

It seems it loads the conf/consumer.properties at some point of time and additionally it implicitly creates consumer-group (in my case console-consumer-67807) when connecting via kafka-console-consumer.sh

如果您不告诉您的控制台用户实际使用该文件,则不会考虑该文件。

提供消费者组名称的备选方案如下:

带有 属性 文件的控制台消费者 (--consumer.config)

文件 config/consumer.properties 应该是这样的

# consumer group id
group.id=my-created-consumer-group

这就是您如何确保控制台消费者考虑到这个 group.id

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties

使用 --group

的控制台消费者

对于控制台消费者,消费者组会自动创建,前缀为“console-consumer”,后缀类似于 PID,除非您通过添加 --group:

来提供自己的消费者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group

基于代码的标准消费者API

使用标准 JAVA/Scala/...消费者 API 您可以通过属性提供消费者组:

Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer");
// set more properties

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.subscribe(Arrays.asList("test-topic")