如何从具有特定偏移量的kafka主题获取消息

How to get message from a kafka topic with a specific offset

我们有一个包含 3 个 kafka 代理的 HDP 集群(来自 hortonworks)

我们想要 运行 kafka 控制台消费者,以便从具有特定偏移量的主题获取一条消息

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper zoo01:2181  --topic lopet.lo.pm--partition 0 --offset 34537263 --max-messages 1

但我们得到以下信息:

我们哪里错了?

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Partition-offset based consumption is supported in the new consumer only.
Option                                   Description
------                                   -----------
--blacklist <blacklist>                  Blacklist of topics to exclude from
                                           consumption.
--bootstrap-server <server to connect    REQUIRED (unless old consumer is
  to>                                      used): The server to connect to.
--consumer-property <consumer_prop>      A mechanism to pass user-defined
                                           properties in the form key=value to
                                           the consumer.
--consumer.config <config file>          Consumer config properties file. Note
                                           that [consumer-property] takes
                                           precedence over this config.
--csv-reporter-enabled                   If set, the CSV metrics reporter will
                                           be enabled
--delete-consumer-offsets                If specified, the consumer path in
                                           zookeeper is deleted when starting up
--enable-systest-events                  Log lifecycle events of the consumer
                                           in addition to logging consumed
                                           messages. (This is specific for
                                           system tests.)
--formatter <class>                      The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--from-beginning                         If the consumer does not already have
                                           an established offset to consume
                                           from, start with the earliest
                                           message present in the log rather
                                           than the latest message.
--key-deserializer <deserializer for
  key>
--max-messages <Integer: num_messages>   The maximum number of messages to
                                           consume before exiting. If not set,
                                           consumption is continual.
--metrics-dir <metrics directory>        If csv-reporter-enable is set, and
                                           this parameter isset, the csv
                                           metrics will be outputed here
--new-consumer                           Use the new consumer implementation.
                                           This is the default.
--offset <consume offset>                The offset id to consume from (a non-
                                           negative number), or 'earliest'
                                           which means from beginning, or
                                           'latest' which means from end
                                           (default: latest)

Partition-offset based consumption is supported in the new consumer only.

kafka-console-consumer 应使用 --bootstrap-server,如警告所述。

并且您在 --partition

之前缺少 space

但除此之外,--partition x --offset y 是正确的。


完全命令

kafka-console-consumer \
  --bootstrap-server kafka0:9092 \
  --topic lopet.lo.pm \
  --partition 0 \
  --offset 34537263 \
  --max-messages 1

使用 kcat 是另一种选择,如果你想安装它

如果有人想以编程方式使用 spring java。你可以阅读我的博客来了解这个想法。

https://rcvaram.medium.com/kafka-customer-get-what-needs-only-45d95f9b1105

Spring-Kafka 通过监听器为消费者提供抽象层。

要消费特定的偏移量消息,您必须按照以下步骤操作。

  1. 我们必须使用消费者 class 的 consumer.assign 方法将消费者分配给主题分区。
  2. 分配消费者后,我们必须使用 consume.seek.
  3. 恢复消费者时需要查找的查找偏移量
  4. 我们可以恢复消费者,在poll方法的帮助下,它会消费来自kafka的消息。在此之前,我们的 consumerFactory 应该已经将 max.poll.records 设置为 1。然后消费者将 return 只有一条记录给我们。
  5. 最后,我们可以提交并暂停消费者。

如果您打算使用 Kafka 消费者,那么您也应该了解并发进程,因为 Kafka 消费者不是 thread-safe.

我的代码也在 my GitHub repository and I published this in the maven repository 中。