发生错误时如何关闭KafkaListener

How shutdown KafkaListener when error occurs

我是这样写的Listener

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@KafkaListener(containerFactory = "cdcKafkaListenerContainerFactory", errorHandler = "errorHandler")
public void consume(@Payload String message) throws Exception {
    ...
}

@Bean
public KafkaListenerErrorHandler errorHandler() {
    return ((message, e) -> {
        kafkaListenerEndpointRegistry.stop();
        return null;
    });
}

@KafkaListener 注释中,我指定了我的错误处理程序,它只是停止了消费者。 好像可以,但我有问题要问。

这个作用域是否有内置的 errorHandler?我读到 ContainerStoppingErrorHandler 可以使用,但我无法设置它,因为 @KafkaListener 的错误处理程序接受 KafkaListenerErrorHandler 类型的 bean。

我看到 kafkaListenerEndpointRegistry.stop();做一个优雅的停止。因此,在停止之前,已提交已消费消息的分区偏移量。 我想知道的是,当 kafkaListenerEndpointRegistry.stop(); 被调用时,在监听器肯定被关闭之前,另一条消息到达主题之前会发生什么? 这条消息被消费了吗?

我想象这个场景

time0: kafkaListenerEndpointRegistry.stop() is called
time1: a message is pushed into the listened topic
time2: kafkaListenerEndpointRegistry.stop() complete graceful stop

我担心消息可能会在时间 1 到达。在这种情况下会发生什么?

不要在侦听器中停止容器。

ContainerStoppingErrorHandler是在容器工厂设置的,不是注解。

如果您正在使用 Spring 引导,只需将错误处理程序声明为一个 bean,引导就会将其连接进来。

否则将错误处理程序添加到连接工厂 bean。

使用这个错误处理程序,抛出异常将立即停止容器。