使用命令行在kafka中消费消息时如何设置组名?
How to set group name when consuming messages in kafka using command line?
知道如何在使用命令行在 kafka 中使用消息时设置组名。
我尝试使用以下命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF2_P2 --from-beginning --config group.id=test1
'config' is not a recognized option
目标是使用以下命令找到已消费消息的偏移量:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test1
这方面有人可以帮忙吗!!
提前致谢!!
从命令提示符处得到更改组名的答案!!
步数:
- 创建一个新的
consumer.properties
文件,比如 consumer1.properties
。
- 在
consumer1.properties
中更改 group.id=<Give a new group name>
。
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer.config config/consumer1.properties --delete-consumer-offsets
如果您想要更改组 ID 而不会丢失记录的偏移量,您可以手动获取当前 Group.id 的偏移量并设置为具有新 ID 的新 运行 消费者。如果没有任何控制权来获取消费者实例中的偏移量,您可以 运行 此命令。
/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server <ip_address>:<Broker_port> --group Group_name --describe
然后你可以从特定的偏移量中寻找数据。注意你应该在调用 poll 之后调用 seek,Assign 命令不起作用。
你也可以在 github
中看到我的代码示例
最简单的解决方案是:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF2_P2 --from-beginning --consumer-property group.id=test1
如果您指定标志 --from-beginning 请记住,消费者组过去不应该消费任何记录,否则您的消费者将从指定组最早未使用的记录(而不是从实际开始,因为您可能
错误地假设)。
如果您正在使用 bash 那么您可以使用它的进程替换功能。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic nil_RF2_P2 --from-beginning \
--consumer.config <(echo group.id=test1)
您可以像这样使用 --group 选项(使用 Kafka 2.0.0 测试):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group test-consumer --topic test --from-beginning
知道如何在使用命令行在 kafka 中使用消息时设置组名。
我尝试使用以下命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF2_P2 --from-beginning --config group.id=test1
'config' is not a recognized option
目标是使用以下命令找到已消费消息的偏移量:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test1
这方面有人可以帮忙吗!!
提前致谢!!
从命令提示符处得到更改组名的答案!!
步数:
- 创建一个新的
consumer.properties
文件,比如consumer1.properties
。 - 在
consumer1.properties
中更改group.id=<Give a new group name>
。 bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer.config config/consumer1.properties --delete-consumer-offsets
如果您想要更改组 ID 而不会丢失记录的偏移量,您可以手动获取当前 Group.id 的偏移量并设置为具有新 ID 的新 运行 消费者。如果没有任何控制权来获取消费者实例中的偏移量,您可以 运行 此命令。
/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server <ip_address>:<Broker_port> --group Group_name --describe
然后你可以从特定的偏移量中寻找数据。注意你应该在调用 poll 之后调用 seek,Assign 命令不起作用。 你也可以在 github
中看到我的代码示例最简单的解决方案是:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF2_P2 --from-beginning --consumer-property group.id=test1
如果您指定标志 --from-beginning 请记住,消费者组过去不应该消费任何记录,否则您的消费者将从指定组最早未使用的记录(而不是从实际开始,因为您可能 错误地假设)。
如果您正在使用 bash 那么您可以使用它的进程替换功能。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic nil_RF2_P2 --from-beginning \
--consumer.config <(echo group.id=test1)
您可以像这样使用 --group 选项(使用 Kafka 2.0.0 测试):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group test-consumer --topic test --from-beginning