Spring Integration and Kafka: 如何根据消息过滤消息header

Spring Integration and Kafka: How to filter messages based on message header

我有一个问题是基于这个问题:

我想使用 Spring Integration DSL 按 kafka 消费者记录 header 进行过滤。

目前我有这个流程:

@Bean
IntegrationFlow readTicketsFlow(KafkaProperties kafkaProperties,
                                ObjectMapper jacksonObjectMapper,
                                EventService<Ticket> service) {
    Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

    return IntegrationFlows.from(
            Kafka.messageDrivenChannelAdapter(
                    consumerFactory, TICKET_TOPIC))
            .transform(fromJson(Ticket.class, new Jackson2JsonObjectMapper(jacksonObjectMapper)))
            .handle(service)
            .get();
}

如何在此流程中注册 org.springframework.kafka.listener.adapter.RecordFilterStrategy

您只需将 .filter() 元素添加到流中即可。

.filter("!'bar'.equals(headers['foo'])")

将过滤掉(忽略)任何 header 名为 foo 等于 bar 的消息。

注意Spring Kafka 的RecordFilterStrategy 具有Spring 集成过滤器

的反向意义
public interface RecordFilterStrategy<K, V> {

    /**
     * Return true if the record should be discarded.
     * @param consumerRecord the record.
     * @return true to discard.
     */
    boolean filter(ConsumerRecord<K, V> consumerRecord);

}

Spring 集成过滤器在过滤器 returns 为假时丢弃消息。

编辑

或者您可以在通道适配器中添加一个RecordFilterStrategy

return IntegrationFlows
        .from(Kafka.messageDrivenChannelAdapter(consumerFactory(), TEST_TOPIC1)
                .recordFilterStrategy(record -> {
                    Header header = record.headers().lastHeader("foo");
                    return header != null ? new String(header.value()).equals("bar") : false;
                })
                ...