kafka中如何创建新的消费者组
How to create a new consumer group in kafka
我在本地使用 运行 kafka,遵循快速入门指南 here、
的说明
然后我在 config/consumer.properties
中定义了我的消费者组配置,以便我的消费者可以从定义的 group.id
中选择消息
运行以下命令,
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
结果,
test-consumer-group <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh
我能够通过基于 python 的消费者连接到 kafka,该消费者被配置为使用提供 group.id
即 test-consumer-group
首先,我无法理解how/whenkafka创建消费组。似乎它在某个时间点加载了 conf/consumer.properties
,另外它在通过 kafka-console-consumer.sh
.
连接时隐式创建了消费者组(在我的例子中是 console-consumer-67807
)
我怎样才能明确地创建我自己的消费者组,比方说 my-created-consumer-group
?
您不会显式创建消费者组,而是构建始终属于某个消费者组的消费者。无论您使用哪种技术(Spark、Spring、Flink 等),每个 Kafka 消费者都会有一个消费者组。消费者组可以为每个人配置消费者。
It seems it loads the conf/consumer.properties at some point of time and additionally it implicitly creates consumer-group (in my case console-consumer-67807) when connecting via kafka-console-consumer.sh
如果您不告诉您的控制台用户实际使用该文件,则不会考虑该文件。
提供消费者组名称的备选方案如下:
带有 属性 文件的控制台消费者 (--consumer.config)
文件 config/consumer.properties
应该是这样的
# consumer group id
group.id=my-created-consumer-group
这就是您如何确保控制台消费者考虑到这个 group.id
:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties
使用 --group
的控制台消费者
对于控制台消费者,消费者组会自动创建,前缀为“console-consumer”,后缀类似于 PID,除非您通过添加 --group
:
来提供自己的消费者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group
基于代码的标准消费者API
使用标准 JAVA/Scala/...消费者 API 您可以通过属性提供消费者组:
Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer");
// set more properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.subscribe(Arrays.asList("test-topic")
我在本地使用 运行 kafka,遵循快速入门指南 here、
的说明然后我在 config/consumer.properties
中定义了我的消费者组配置,以便我的消费者可以从定义的 group.id
运行以下命令,
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
结果,
test-consumer-group <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh
我能够通过基于 python 的消费者连接到 kafka,该消费者被配置为使用提供 group.id
即 test-consumer-group
首先,我无法理解how/whenkafka创建消费组。似乎它在某个时间点加载了 conf/consumer.properties
,另外它在通过 kafka-console-consumer.sh
.
console-consumer-67807
)
我怎样才能明确地创建我自己的消费者组,比方说 my-created-consumer-group
?
您不会显式创建消费者组,而是构建始终属于某个消费者组的消费者。无论您使用哪种技术(Spark、Spring、Flink 等),每个 Kafka 消费者都会有一个消费者组。消费者组可以为每个人配置消费者。
It seems it loads the conf/consumer.properties at some point of time and additionally it implicitly creates consumer-group (in my case console-consumer-67807) when connecting via kafka-console-consumer.sh
如果您不告诉您的控制台用户实际使用该文件,则不会考虑该文件。
提供消费者组名称的备选方案如下:
带有 属性 文件的控制台消费者 (--consumer.config)
文件 config/consumer.properties
应该是这样的
# consumer group id
group.id=my-created-consumer-group
这就是您如何确保控制台消费者考虑到这个 group.id
:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties
使用 --group
的控制台消费者对于控制台消费者,消费者组会自动创建,前缀为“console-consumer”,后缀类似于 PID,除非您通过添加 --group
:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group
基于代码的标准消费者API
使用标准 JAVA/Scala/...消费者 API 您可以通过属性提供消费者组:
Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer");
// set more properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.subscribe(Arrays.asList("test-topic")