Spring 引导 Kafka 侦听器与消费者
Spring Boot Kafka Listener vs Consumer
有什么区别?术语 KafkaConsumer 和 KafkaListener 可以互换使用吗?
@KafkaListener
是 ConcurrentMessageListenerContainer
的高级 API,它在 KafkaConsumer
周围生成了多个内部侦听器。
不同之处在于,KafkaConsumer
API 是 可轮询的 ,当您需要时调用它的 poll()
。侦听器抽象将围绕 poll()
进行无限循环,并且每当记录从 poll()
出现时,它都会为记录生成消息。我们有一个任务执行器,它运行这样的逻辑:
while (isRunning()) {
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
catch (Error e) { // NOSONAR - rethrown
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
if (runnable != null) {
runnable.run();
}
this.logger.error("Stopping container due to an Error", e);
wrapUp();
throw e;
}
}
在pollAndInvoke();
中调用了KafkaConsumer.poll()
。
有什么区别?术语 KafkaConsumer 和 KafkaListener 可以互换使用吗?
@KafkaListener
是 ConcurrentMessageListenerContainer
的高级 API,它在 KafkaConsumer
周围生成了多个内部侦听器。
不同之处在于,KafkaConsumer
API 是 可轮询的 ,当您需要时调用它的 poll()
。侦听器抽象将围绕 poll()
进行无限循环,并且每当记录从 poll()
出现时,它都会为记录生成消息。我们有一个任务执行器,它运行这样的逻辑:
while (isRunning()) {
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
catch (Error e) { // NOSONAR - rethrown
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
if (runnable != null) {
runnable.run();
}
this.logger.error("Stopping container due to an Error", e);
wrapUp();
throw e;
}
}
在pollAndInvoke();
中调用了KafkaConsumer.poll()
。