Spring Integration and Kafka: 如何根据消息过滤消息header
Spring Integration and Kafka: How to filter messages based on message header
我有一个问题是基于这个问题:
我想使用 Spring Integration DSL 按 kafka 消费者记录 header 进行过滤。
目前我有这个流程:
@Bean
IntegrationFlow readTicketsFlow(KafkaProperties kafkaProperties,
ObjectMapper jacksonObjectMapper,
EventService<Ticket> service) {
Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
return IntegrationFlows.from(
Kafka.messageDrivenChannelAdapter(
consumerFactory, TICKET_TOPIC))
.transform(fromJson(Ticket.class, new Jackson2JsonObjectMapper(jacksonObjectMapper)))
.handle(service)
.get();
}
如何在此流程中注册 org.springframework.kafka.listener.adapter.RecordFilterStrategy
?
您只需将 .filter()
元素添加到流中即可。
.filter("!'bar'.equals(headers['foo'])")
将过滤掉(忽略)任何 header 名为 foo
等于 bar
的消息。
注意Spring Kafka 的RecordFilterStrategy
具有Spring 集成过滤器
的反向意义
public interface RecordFilterStrategy<K, V> {
/**
* Return true if the record should be discarded.
* @param consumerRecord the record.
* @return true to discard.
*/
boolean filter(ConsumerRecord<K, V> consumerRecord);
}
Spring 集成过滤器在过滤器 returns 为假时丢弃消息。
编辑
或者您可以在通道适配器中添加一个RecordFilterStrategy
。
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), TEST_TOPIC1)
.recordFilterStrategy(record -> {
Header header = record.headers().lastHeader("foo");
return header != null ? new String(header.value()).equals("bar") : false;
})
...
我有一个问题是基于这个问题:
我想使用 Spring Integration DSL 按 kafka 消费者记录 header 进行过滤。
目前我有这个流程:
@Bean
IntegrationFlow readTicketsFlow(KafkaProperties kafkaProperties,
ObjectMapper jacksonObjectMapper,
EventService<Ticket> service) {
Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
return IntegrationFlows.from(
Kafka.messageDrivenChannelAdapter(
consumerFactory, TICKET_TOPIC))
.transform(fromJson(Ticket.class, new Jackson2JsonObjectMapper(jacksonObjectMapper)))
.handle(service)
.get();
}
如何在此流程中注册 org.springframework.kafka.listener.adapter.RecordFilterStrategy
?
您只需将 .filter()
元素添加到流中即可。
.filter("!'bar'.equals(headers['foo'])")
将过滤掉(忽略)任何 header 名为 foo
等于 bar
的消息。
注意Spring Kafka 的RecordFilterStrategy
具有Spring 集成过滤器
public interface RecordFilterStrategy<K, V> {
/**
* Return true if the record should be discarded.
* @param consumerRecord the record.
* @return true to discard.
*/
boolean filter(ConsumerRecord<K, V> consumerRecord);
}
Spring 集成过滤器在过滤器 returns 为假时丢弃消息。
编辑
或者您可以在通道适配器中添加一个RecordFilterStrategy
。
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), TEST_TOPIC1)
.recordFilterStrategy(record -> {
Header header = record.headers().lastHeader("foo");
return header != null ? new String(header.value()).equals("bar") : false;
})
...