发生错误时如何关闭KafkaListener
How shutdown KafkaListener when error occurs
我是这样写的Listener
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@KafkaListener(containerFactory = "cdcKafkaListenerContainerFactory", errorHandler = "errorHandler")
public void consume(@Payload String message) throws Exception {
...
}
@Bean
public KafkaListenerErrorHandler errorHandler() {
return ((message, e) -> {
kafkaListenerEndpointRegistry.stop();
return null;
});
}
在 @KafkaListener
注释中,我指定了我的错误处理程序,它只是停止了消费者。
好像可以,但我有问题要问。
这个作用域是否有内置的 errorHandler?我读到 ContainerStoppingErrorHandler
可以使用,但我无法设置它,因为 @KafkaListener
的错误处理程序接受 KafkaListenerErrorHandler
类型的 bean。
我看到 kafkaListenerEndpointRegistry.stop()
;做一个优雅的停止。因此,在停止之前,已提交已消费消息的分区偏移量。
我想知道的是,当 kafkaListenerEndpointRegistry.stop();
被调用时,在监听器肯定被关闭之前,另一条消息到达主题之前会发生什么?
这条消息被消费了吗?
我想象这个场景
time0: kafkaListenerEndpointRegistry.stop() is called
time1: a message is pushed into the listened topic
time2: kafkaListenerEndpointRegistry.stop() complete graceful stop
我担心消息可能会在时间 1 到达。在这种情况下会发生什么?
不要在侦听器中停止容器。
ContainerStoppingErrorHandler
是在容器工厂设置的,不是注解。
如果您正在使用 Spring 引导,只需将错误处理程序声明为一个 bean,引导就会将其连接进来。
否则将错误处理程序添加到连接工厂 bean。
使用这个错误处理程序,抛出异常将立即停止容器。
我是这样写的Listener
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@KafkaListener(containerFactory = "cdcKafkaListenerContainerFactory", errorHandler = "errorHandler")
public void consume(@Payload String message) throws Exception {
...
}
@Bean
public KafkaListenerErrorHandler errorHandler() {
return ((message, e) -> {
kafkaListenerEndpointRegistry.stop();
return null;
});
}
在 @KafkaListener
注释中,我指定了我的错误处理程序,它只是停止了消费者。
好像可以,但我有问题要问。
这个作用域是否有内置的 errorHandler?我读到 ContainerStoppingErrorHandler
可以使用,但我无法设置它,因为 @KafkaListener
的错误处理程序接受 KafkaListenerErrorHandler
类型的 bean。
我看到 kafkaListenerEndpointRegistry.stop()
;做一个优雅的停止。因此,在停止之前,已提交已消费消息的分区偏移量。
我想知道的是,当 kafkaListenerEndpointRegistry.stop();
被调用时,在监听器肯定被关闭之前,另一条消息到达主题之前会发生什么?
这条消息被消费了吗?
我想象这个场景
time0: kafkaListenerEndpointRegistry.stop() is called
time1: a message is pushed into the listened topic
time2: kafkaListenerEndpointRegistry.stop() complete graceful stop
我担心消息可能会在时间 1 到达。在这种情况下会发生什么?
不要在侦听器中停止容器。
ContainerStoppingErrorHandler
是在容器工厂设置的,不是注解。
如果您正在使用 Spring 引导,只需将错误处理程序声明为一个 bean,引导就会将其连接进来。
否则将错误处理程序添加到连接工厂 bean。
使用这个错误处理程序,抛出异常将立即停止容器。