记录过滤策略问题(Spring 启动:2.3.8)。过滤后的消息一次又一次地进入过滤器
Issue with Record Filter Strategy(Spring boot : 2.3.8). Filtered messages are coming again and again to the filter
我正在研究 spring kafka 批处理监听器过滤策略。我面临一个问题,过滤事件一次又一次地出现。任何人都可以帮助我解决这个问题吗? spring 使用 kafka 版本 (2.3.8) 启动
这是我的配置:
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new
ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.getContainerProperties().setIdleBetweenPolls(30000);
factory.setRecordFilterStrategy(
(consumerRecord) -> {
MyObject myObject = new ObjectMapper().readValue(consumerRecord.value(), MyObj.class);
if (myObject.frequency > 10) {
return false;
} else {
return true;
}});
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
使用带有 MANUAL acks 的批处理模式时,如果您过滤所有记录(全部丢弃),侦听器将得到一个空列表,因此您仍然可以确认批处理以提交偏移量。
我刚刚测试了它,它按预期工作。
@SpringBootApplication
public class So67259790Application {
public static void main(String[] args) {
SpringApplication.run(So67259790Application.class, args);
}
@KafkaListener(id = "so67259790", topics = "so67259790")
public void listen(List<String> in, Acknowledgment ack) {
System.out.println(in);
ack.acknowledge();
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67259790").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so67259790", "foo");
template.send("so67259790", "bar");
};
}
@Bean
public RecordFilterStrategy<Object, Object> rfs() {
return rec -> true;
}
}
我正在研究 spring kafka 批处理监听器过滤策略。我面临一个问题,过滤事件一次又一次地出现。任何人都可以帮助我解决这个问题吗? spring 使用 kafka 版本 (2.3.8) 启动
这是我的配置:
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new
ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.getContainerProperties().setIdleBetweenPolls(30000);
factory.setRecordFilterStrategy(
(consumerRecord) -> {
MyObject myObject = new ObjectMapper().readValue(consumerRecord.value(), MyObj.class);
if (myObject.frequency > 10) {
return false;
} else {
return true;
}});
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
使用带有 MANUAL acks 的批处理模式时,如果您过滤所有记录(全部丢弃),侦听器将得到一个空列表,因此您仍然可以确认批处理以提交偏移量。
我刚刚测试了它,它按预期工作。
@SpringBootApplication
public class So67259790Application {
public static void main(String[] args) {
SpringApplication.run(So67259790Application.class, args);
}
@KafkaListener(id = "so67259790", topics = "so67259790")
public void listen(List<String> in, Acknowledgment ack) {
System.out.println(in);
ack.acknowledge();
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67259790").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so67259790", "foo");
template.send("so67259790", "bar");
};
}
@Bean
public RecordFilterStrategy<Object, Object> rfs() {
return rec -> true;
}
}