Kafka 控制台消费者在使用 --max-messages 时提交错误的偏移量

Kafka console consumer commits wrong offset when using --max-messages

我有一个 1.1.0 版的 kafka 控制台消费者,我用它来从 Kafka 获取消息。 当我使用带有选项 --max-messages 的 kafka-console-consumer.sh 脚本时,它似乎提交了错误的偏移量。

我创建了一个主题和一个消费者组并阅读了一些消息:

/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test.offset     1          374             374             0               -               -               -
test.offset     0          0               375             375             -               -               -

比我读了 10 条这样的消息:

/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
1 var_1
3 var_3
5 var_5
7 var_7
9 var_9
11 var_11
13 var_13
15 var_15
17 var_17
19 var_19
Processed a total of 10 messages

但是现在偏移量显示它读取了一个主题中的所有消息

/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'my-consumer-group' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test.offset     1          374             374             0               -               -               -
test.offset     0          375             375             0               -               -               -

现在,当我想阅读更多消息时,我收到一个错误,提示主题中没有更多消息:

/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
[2020-02-28 08:27:54,782] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
kafka.consumer.ConsumerTimeoutException
        at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:98)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:129)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

我做错了什么?为什么偏移量移动到主题中的最后一条消息而不是仅仅移动 10 条消息?

这是关于Kafka消费者的自动提交特性。如this link所述:

The easiest way to commit offsets is to allow the consumer to do it for you. If you configure enable.auto.commit=true, then every five seconds the consumer will commit the largest offset your client received from poll(). The five-second interval is the default and is controlled by setting auto.commit.interval.ms. Just like everything else in the consumer, the automatic commits are driven by the poll loop. Whenever you poll, the consumer checks if it is time to commit, and if it is, it will commit the offsets it returned in the last poll.

因此,在您的情况下,当您的消费者轮询时,它会接收最多 500 条消息(默认值为 max.poll.records),并且在 5 秒后它会提交与上次轮询(375 in)的最大偏移量 return你的情况)即使你指定 max-messages 为 10.

--max-messages: The maximum number of messages to consume before exiting. If not set, consumption is continual.