使用命令行在kafka中消费消息时如何设置组名?

How to set group name when consuming messages in kafka using command line?

知道如何在使用命令行在 kafka 中使用消息时设置组名。

我尝试使用以下命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF2_P2 --from-beginning --config group.id=test1
'config' is not a recognized option

目标是使用以下命令找到已消费消息的偏移量:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test1

这方面有人可以帮忙吗!!

提前致谢!!

从命令提示符处得到更改组名的答案!!

步数:

  1. 创建一个新的 consumer.properties 文件,比如 consumer1.properties
  2. consumer1.properties 中更改 group.id=<Give a new group name>
  3. bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer.config config/consumer1.properties --delete-consumer-offsets

如果您想要更改组 ID 而不会丢失记录的偏移量,您可以手动获取当前 Group.id 的偏移量并设置为具有新 ID 的新 运行 消费者。如果没有任何控制权来获取消费者实例中的偏移量,您可以 运行 此命令。

/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server <ip_address>:<Broker_port>  --group Group_name --describe

然后你可以从特定的偏移量中寻找数据。注意你应该在调用 poll 之后调用 seek,Assign 命令不起作用。 你也可以在 github

中看到我的代码示例

Example here

最简单的解决方案是:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF2_P2 --from-beginning --consumer-property group.id=test1

如果您指定标志 --from-beginning 请记住,消费者组过去不应该消费任何记录,否则您的消费者将从指定组最早未使用的记录(而不是从实际开始,因为您可能 错误地假设)。

如果您正在使用 bash 那么您可以使用它的进程替换功能。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic nil_RF2_P2 --from-beginning \
--consumer.config <(echo group.id=test1)

您可以像这样使用 --group 选项(使用 Kafka 2.0.0 测试):

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group test-consumer --topic test --from-beginning