如何从具有特定偏移量的kafka主题获取消息
How to get message from a kafka topic with a specific offset
我们有一个包含 3 个 kafka 代理的 HDP 集群(来自 hortonworks)
我们想要 运行 kafka 控制台消费者,以便从具有特定偏移量的主题获取一条消息
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper zoo01:2181 --topic lopet.lo.pm--partition 0 --offset 34537263 --max-messages 1
但我们得到以下信息:
我们哪里错了?
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Partition-offset based consumption is supported in the new consumer only.
Option Description
------ -----------
--blacklist <blacklist> Blacklist of topics to exclude from
consumption.
--bootstrap-server <server to connect REQUIRED (unless old consumer is
to> used): The server to connect to.
--consumer-property <consumer_prop> A mechanism to pass user-defined
properties in the form key=value to
the consumer.
--consumer.config <config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
--csv-reporter-enabled If set, the CSV metrics reporter will
be enabled
--delete-consumer-offsets If specified, the consumer path in
zookeeper is deleted when starting up
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--key-deserializer <deserializer for
key>
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--metrics-dir <metrics directory> If csv-reporter-enable is set, and
this parameter isset, the csv
metrics will be outputed here
--new-consumer Use the new consumer implementation.
This is the default.
--offset <consume offset> The offset id to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
Partition-offset based consumption is supported in the new consumer only.
kafka-console-consumer
应使用 --bootstrap-server
,如警告所述。
并且您在 --partition
之前缺少 space
但除此之外,--partition x --offset y
是正确的。
完全命令
kafka-console-consumer \
--bootstrap-server kafka0:9092 \
--topic lopet.lo.pm \
--partition 0 \
--offset 34537263 \
--max-messages 1
使用 kcat
是另一种选择,如果你想安装它
如果有人想以编程方式使用 spring java。你可以阅读我的博客来了解这个想法。
https://rcvaram.medium.com/kafka-customer-get-what-needs-only-45d95f9b1105
Spring-Kafka 通过监听器为消费者提供抽象层。
要消费特定的偏移量消息,您必须按照以下步骤操作。
- 我们必须使用消费者 class 的
consumer.assign
方法将消费者分配给主题分区。
- 分配消费者后,我们必须使用
consume.seek
. 恢复消费者时需要查找的查找偏移量
- 我们可以恢复消费者,在poll方法的帮助下,它会消费来自kafka的消息。在此之前,我们的 consumerFactory 应该已经将
max.poll.records
设置为 1。然后消费者将 return 只有一条记录给我们。
- 最后,我们可以提交并暂停消费者。
如果您打算使用 Kafka 消费者,那么您也应该了解并发进程,因为 Kafka 消费者不是 thread-safe.
我的代码也在 my GitHub repository and I published this in the maven repository 中。
我们有一个包含 3 个 kafka 代理的 HDP 集群(来自 hortonworks)
我们想要 运行 kafka 控制台消费者,以便从具有特定偏移量的主题获取一条消息
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper zoo01:2181 --topic lopet.lo.pm--partition 0 --offset 34537263 --max-messages 1
但我们得到以下信息:
我们哪里错了?
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Partition-offset based consumption is supported in the new consumer only.
Option Description
------ -----------
--blacklist <blacklist> Blacklist of topics to exclude from
consumption.
--bootstrap-server <server to connect REQUIRED (unless old consumer is
to> used): The server to connect to.
--consumer-property <consumer_prop> A mechanism to pass user-defined
properties in the form key=value to
the consumer.
--consumer.config <config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
--csv-reporter-enabled If set, the CSV metrics reporter will
be enabled
--delete-consumer-offsets If specified, the consumer path in
zookeeper is deleted when starting up
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--key-deserializer <deserializer for
key>
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--metrics-dir <metrics directory> If csv-reporter-enable is set, and
this parameter isset, the csv
metrics will be outputed here
--new-consumer Use the new consumer implementation.
This is the default.
--offset <consume offset> The offset id to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
Partition-offset based consumption is supported in the new consumer only.
kafka-console-consumer
应使用 --bootstrap-server
,如警告所述。
并且您在 --partition
但除此之外,--partition x --offset y
是正确的。
完全命令
kafka-console-consumer \
--bootstrap-server kafka0:9092 \
--topic lopet.lo.pm \
--partition 0 \
--offset 34537263 \
--max-messages 1
使用 kcat
是另一种选择,如果你想安装它
如果有人想以编程方式使用 spring java。你可以阅读我的博客来了解这个想法。
https://rcvaram.medium.com/kafka-customer-get-what-needs-only-45d95f9b1105
Spring-Kafka 通过监听器为消费者提供抽象层。
要消费特定的偏移量消息,您必须按照以下步骤操作。
- 我们必须使用消费者 class 的
consumer.assign
方法将消费者分配给主题分区。 - 分配消费者后,我们必须使用
consume.seek
. 恢复消费者时需要查找的查找偏移量
- 我们可以恢复消费者,在poll方法的帮助下,它会消费来自kafka的消息。在此之前,我们的 consumerFactory 应该已经将
max.poll.records
设置为 1。然后消费者将 return 只有一条记录给我们。 - 最后,我们可以提交并暂停消费者。
如果您打算使用 Kafka 消费者,那么您也应该了解并发进程,因为 Kafka 消费者不是 thread-safe.
我的代码也在 my GitHub repository and I published this in the maven repository 中。