如何优雅地关闭 运行 Kafka 消费者

How to Gracefully Turn Off Running Kafka Consumer

我需要在一些数据库驱动属性的基础上转Kafka消费者on/off。怎么可能实现。

我想到的一种方法是:当消费者标志关闭时从消费者那里抛出异常。容器工厂配置定义为

factory.setErrorHandler(new SeekToCurrentErrorHandler());

但它主动寻找相同的消息。

有什么方法可以关闭心跳然后按需重新打开。

消费者有几个 API 来控制其状态:

  • pause()/resume():允许 stop/resume 从一组分区中使用。消费者保持订阅状态(因此没有重新平衡)但在恢复之前不会获取任何新记录

  • unsubscribe(): 允许更改消费者订阅,如果没有订阅任何东西,它将保持与集群的连接。

如果您是 "done" 消费者,您也可以调用 close() 并在需要时开始一个新的

您可以 stop()start() 侦听器容器。

看来您正在使用 @KafkaListener,因为您正在使用容器工厂。

那样的话

@KafkaListener(id = "foo" ...)

然后使用 KafkaListenerEndpointRegistry bean ...

registry.getListenerContainer("foo").stop();