根据 headers 在反序列化之前过滤消息

Filter messages before deserialization based on headers

有时可以根据 header 值在反序列化之前过滤掉消息。使用 spring kafka 是否有针对此场景的任何现有模式。我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托也将过滤器谓词也作为 属性。有什么建议么?谢谢。

是的,您可以使用 ErrorHandlingDeserializer 使用的相同技术来 return 一个“标记”对象而不是进行反序列化,然后添加一个 RecordFilterStrategy,过滤记录对于此类对象,监听器(使用 @KafkaListener 时的容器工厂或对显式监听器使用过滤适配器)。

编辑

Spring启动并添加过滤器...

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
        return factory;
    }