Spring Kafka 的 MessageListener 在 MessageListenerContainer 停止后仍在消耗

Spring Kafka's MessageListener still consuming after MessageListenerContainer is stopped

我的目标是为 Spring Kafka 创建我自己的 Camel 组件。

我已经成功创建并开始使用了。我还希望能够停止组件和消耗(使用 JMX,使用其他 Camel 路由,...),而不丢失任何消息。 为此,当停止 Camel 组件时,我需要停止一个 MessageListenerContainer 并最终停止 MessageListener ,它在 MessageListenerContainer.

中注册

我的问题是当 MessageListenerContainer 停止时,MessageListener 仍在处理消息。

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        if (kafkaMessageListenerContainer != null) {
            return;
        }

        kafkaMessageListenerContainer = kafkaListenerContainerFactory.createContainer(endpoint.getTopicName());
        kafkaMessageListenerContainer.setupMessageListener(messageListener());
        kafkaMessageListenerContainer.start();
    }

    @Override
    protected void doStop() throws Exception {
        LOG.info("STOPPING kafkaMessageListenerContainer");
        kafkaMessageListenerContainer.stop();
        LOG.info("STOPPED kafkaMessageListenerContainer");
        super.doStop();
    }

    private MessageListener<Object, Object> messageListener() {
        return new MessageListener<Object, Object>() {
            @Override public void onMessage(ConsumerRecord<Object, Object> data) {
                LOG.info("Record received: {}", data.offset());
                //...pass a message to Camel processing route
                LOG.info("Record processed: {}", data.offset());
            }
        };
    }

这是日志中的片段

{"time":"2020-11-27T14:01:57.047Z","message":"Record received: 2051","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"c5efc5db-5981-4477-925a-83ffece49572"}
{"time":"2020-11-27T14:01:57.153Z","message":"STOPPED kafkaMessageListenerContainer","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"Camel (camelContext) thread #2 - ShutdownTask","level":"INFO"}
{"time":"2020-11-27T14:01:57.153Z","message":"Route: testTopic.consumer shutdown complete, was consuming from: my-kafka://events.TestTopic","logger":"org.apache.camel.impl.DefaultShutdownStrategy","thread-id":"Camel (camelContext) thread #2 - ShutdownTask","level":"INFO"}
{"time":"2020-11-27T14:01:57.159Z","message":"Record processed: 2051","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"8c835691-ba8d-43c2-b3e0-90a2f768ed7f"}
{"time":"2020-11-27T14:01:57.165Z","message":"Record received: 2052","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"8c835691-ba8d-43c2-b3e0-90a2f768ed7f"}
{"time":"2020-11-27T14:01:57.275Z","message":"Record processed: 2052","logger":"com.my.lib.springboot.camel.component.kafka.KafkaAdapterConsumer","thread-id":"consumer-0-C-1","level":"INFO","tId":"f7bcebb4-9e5e-46a1-bc5b-569264914b05"}
...

我希望 MessageListenerMessageListenerContainer 正常停止后不会再消耗。我一定是遗漏了什么,有什么建议吗?

非常感谢!

我发现了导致我遇到问题的问题。

出于某种原因,我覆盖了不正确的 consumerFactory。

        @Bean
        public ConsumerFactory<Object, Object> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  org.apache.kafka.common.serialization.StringDeserializer);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  org.apache.kafka.common.serialization.StringDeserializer);
            return new DefaultKafkaConsumerFactory<>(props);
        }

删除它并使用 application.yml 中配置的默认设置后,问题已解决。