将 ConsumerRebalanceListener 添加到 ConcurrentKafkaListenerContainerFactory

Adding ConsumerRebalanceListener to the ConcurrentKafkaListenerContainerFactory

在 Spring 引导应用程序中,我使用了一个 class 注释为 @KafkaListener 的消息侦听器。我想将 ConsumerRebalanceLister 添加到我的应用程序以管理重新平衡时的缓存数据。

如何将 ConsumerRebalanceListener 添加到 ConcurrentKafkaListenerContainerFactory。 documentation says that it should be set on a ContainerProperties object. It's not clear how to access that object in order to set it. Additionally, it looks like the ConcurrentKafkaListenerContainerFactory 丢弃了再平衡监听器,因为它在创建监听器容器实例时创建了一个新的 ContainerProperties 对象。

我觉得我在这里遗漏了一些非常明显的东西,在 this commit 之前有一种方法可以直接在 ConcurrentKafkaListenerContainerFactory 上简单地设置再平衡侦听器。

考虑在 ConcurrentKafkaListenerContainerFactory:

上使用此方法
/**
 * Obtain the properties template for this factory - set properties as needed
 * and they will be copied to a final properties instance for the endpoint.
 * @return the properties.
 */
public ContainerProperties getContainerProperties() {

这是您可以添加 ConsumerRebalanceListener 的地方。您 @Autowired 自动配置 ConcurrentKafkaListenerContainerFactory 并执行上述注入:

@Autowired
private ConcurrentKafkaListenerContainerFactory containerFactory;

@PostConstruct
public void init() {
    this.containerFactory.getContainerProperties()
            .setConsumerRebalanceListener(myConsumerRebalanceListener());
}

@Bean
public ConsumerRebalanceListener myConsumerRebalanceListener() {
    return new ConsumerRebalanceListener() {
        ...
    };
}