Spring 引导 Kafka 侦听器与消费者

Spring Boot Kafka Listener vs Consumer

有什么区别?术语 KafkaConsumer 和 KafkaListener 可以互换使用吗?

@KafkaListenerConcurrentMessageListenerContainer 的高级 API,它在 KafkaConsumer 周围生成了多个内部侦听器。

不同之处在于,KafkaConsumer API 是 可轮询的 ,当您需要时调用它的 poll()。侦听器抽象将围绕 poll() 进行无限循环,并且每当记录从 poll() 出现时,它都会为记录生成消息。我们有一个任务执行器,它运行这样的逻辑:

        while (isRunning()) {
            try {
                pollAndInvoke();
            }
            catch (@SuppressWarnings(UNUSED) WakeupException e) {
                // Ignore, we're stopping
            }
            catch (NoOffsetForPartitionException nofpe) {
                this.fatalError = true;
                ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
                break;
            }
            catch (Exception e) {
                handleConsumerException(e);
            }
            catch (Error e) { // NOSONAR - rethrown
                Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
                if (runnable != null) {
                    runnable.run();
                }
                this.logger.error("Stopping container due to an Error", e);
                wrapUp();
                throw e;
            }
        }

pollAndInvoke();中调用了KafkaConsumer.poll()