如何优雅地关闭 运行 Kafka 消费者
How to Gracefully Turn Off Running Kafka Consumer
我需要在一些数据库驱动属性的基础上转Kafka消费者on/off。怎么可能实现。
我想到的一种方法是:当消费者标志关闭时从消费者那里抛出异常。容器工厂配置定义为
factory.setErrorHandler(new SeekToCurrentErrorHandler());
但它主动寻找相同的消息。
有什么方法可以关闭心跳然后按需重新打开。
消费者有几个 API 来控制其状态:
pause()
/resume()
:允许 stop/resume 从一组分区中使用。消费者保持订阅状态(因此没有重新平衡)但在恢复之前不会获取任何新记录
unsubscribe()
: 允许更改消费者订阅,如果没有订阅任何东西,它将保持与集群的连接。
如果您是 "done" 消费者,您也可以调用 close()
并在需要时开始一个新的
您可以 stop()
和 start()
侦听器容器。
看来您正在使用 @KafkaListener
,因为您正在使用容器工厂。
那样的话
@KafkaListener(id = "foo" ...)
然后使用 KafkaListenerEndpointRegistry
bean ...
registry.getListenerContainer("foo").stop();
我需要在一些数据库驱动属性的基础上转Kafka消费者on/off。怎么可能实现。
我想到的一种方法是:当消费者标志关闭时从消费者那里抛出异常。容器工厂配置定义为
factory.setErrorHandler(new SeekToCurrentErrorHandler());
但它主动寻找相同的消息。
有什么方法可以关闭心跳然后按需重新打开。
消费者有几个 API 来控制其状态:
pause()
/resume()
:允许 stop/resume 从一组分区中使用。消费者保持订阅状态(因此没有重新平衡)但在恢复之前不会获取任何新记录unsubscribe()
: 允许更改消费者订阅,如果没有订阅任何东西,它将保持与集群的连接。
如果您是 "done" 消费者,您也可以调用 close()
并在需要时开始一个新的
您可以 stop()
和 start()
侦听器容器。
看来您正在使用 @KafkaListener
,因为您正在使用容器工厂。
那样的话
@KafkaListener(id = "foo" ...)
然后使用 KafkaListenerEndpointRegistry
bean ...
registry.getListenerContainer("foo").stop();