SpringCloudStream如何实现Kafka记录级别的消息过滤?

How to do message filtering at Kafka record level when using SpringCloudStream?

我正在使用 Spring Cloud Stream (SCS) 和 Kafka 作为活页夹。

我想对基于 Kafka header 的记录进行 low-level 过滤。推荐的方法是什么?

应忽略已过滤的消息并应提交偏移量。

我正在考虑配置 RecordFilterStrategy。

RecordFilterStrategy Spring Cloud Stream 不支持。

您可以添加 ListenerContainerCustomizer bean 并添加一个 RecordInterceptor 到侦听器容器。如果拦截器returnsnull,则不调用监听器,提交偏移量,就好像监听器被调用并正常退出一样。