根据 headers 在反序列化之前过滤消息
Filter messages before deserialization based on headers
有时可以根据 header 值在反序列化之前过滤掉消息。使用 spring kafka 是否有针对此场景的任何现有模式。我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托也将过滤器谓词也作为 属性。有什么建议么?谢谢。
是的,您可以使用 ErrorHandlingDeserializer
使用的相同技术来 return 一个“标记”对象而不是进行反序列化,然后添加一个 RecordFilterStrategy
,过滤记录对于此类对象,监听器(使用 @KafkaListener
时的容器工厂或对显式监听器使用过滤适配器)。
编辑
Spring启动并添加过滤器...
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
return factory;
}
有时可以根据 header 值在反序列化之前过滤掉消息。使用 spring kafka 是否有针对此场景的任何现有模式。我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托也将过滤器谓词也作为 属性。有什么建议么?谢谢。
是的,您可以使用 ErrorHandlingDeserializer
使用的相同技术来 return 一个“标记”对象而不是进行反序列化,然后添加一个 RecordFilterStrategy
,过滤记录对于此类对象,监听器(使用 @KafkaListener
时的容器工厂或对显式监听器使用过滤适配器)。
编辑
Spring启动并添加过滤器...
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
return factory;
}