SpringCloudStream如何实现Kafka记录级别的消息过滤?
How to do message filtering at Kafka record level when using SpringCloudStream?
我正在使用 Spring Cloud Stream (SCS) 和 Kafka 作为活页夹。
我想对基于 Kafka header 的记录进行 low-level 过滤。推荐的方法是什么?
应忽略已过滤的消息并应提交偏移量。
我正在考虑配置 RecordFilterStrategy。
RecordFilterStrategy
Spring Cloud Stream 不支持。
您可以添加 ListenerContainerCustomizer
bean 并添加一个 RecordInterceptor
到侦听器容器。如果拦截器returnsnull,则不调用监听器,提交偏移量,就好像监听器被调用并正常退出一样。
我正在使用 Spring Cloud Stream (SCS) 和 Kafka 作为活页夹。
我想对基于 Kafka header 的记录进行 low-level 过滤。推荐的方法是什么?
应忽略已过滤的消息并应提交偏移量。
我正在考虑配置 RecordFilterStrategy。
RecordFilterStrategy
Spring Cloud Stream 不支持。
您可以添加 ListenerContainerCustomizer
bean RecordInterceptor
到侦听器容器。如果拦截器returnsnull,则不调用监听器,提交偏移量,就好像监听器被调用并正常退出一样。