Spring 中的 Apache Camel 可以从 Kafka 获取以前的消息吗?

Can Apache Camel inside Spring get previous messages from Kafka?

我有一个 Spring 引导应用程序,它通过 Apache Camel 使用来自 Apache Kafka 的消息

public void init() throws Exception {
    DefaultCamelContext camelContext = new DefaultCamelContext();
    camelContext.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("kafka:destination?brokers=<host>:9092&maxBlockMs=5000&reconnectBackoffMaxMs=2000")
                    .process(senderProcessor);
        }
    });

    camelContext.start();
}

如果上下文在消息到达时已经启动,它就可以工作。但是,如果在应用程序启动之前将消息发送到 Kafka,则该应用程序不会使用这些消息。我相信一定有办法得到它们。是真的吗?

有一个名为 auto.offset.resetKafka 消费者设置,其默认值为 latest

Camel-Kafka 中,设置名为 autoOffsetReset

这意味着消费者忽略所有现有消息并且只使用新消息。如果将其设置为 earliest,消费者将使用所有现有消息。

请注意,这仅在消费者首次连接时才重要。一旦连接并使用消息,它应该将偏移量提交回代理并在重新连接时使用这个保存的偏移量。