如何使用 Class-Level @KafkaListener 处理记录值没有 @KafkaHandler 的 Kafka 记录

How to Handle a Kafka Record with a Class-Level @KafkaListener with no @KafkaHandler for the Record Value

通常,当我们定义一个class级@KafkaListener和方法级@KafkaHandler时,我们可以定义一个默认的@KafkaHandler来处理意外的负载。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener

但是,没有默认方法怎么办?

在 2.6 及更高版本中,您可以配置 SeekToCurrentErrorHandler 通过检查异常立即将此类消息发送到死信主题。

这是一个简单的 Spring 引导应用程序,它演示了该技术:

@SpringBootApplication
public class So59256214Application {

    public static void main(String[] args) {
        SpringApplication.run(So59256214Application.class, args);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so59256214").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so59256214.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so59256214.DLT", topics = "so59256214.DLT")
    void listen(ConsumerRecord<?, ?> in) {
        System.out.println("dlt: " + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
        return args -> {
            template.send("so59256214", 42);
            template.send("so59256214", 42.0);
            template.send("so59256214", "No handler for this");
        };
    }

    @Bean
    ErrorHandler eh(KafkaOperations<String, Object> template) {
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template));
        BackOff neverRetryOrBackOff = new FixedBackOff(0L, 0);
        BackOff normalBackOff = new FixedBackOff(2000L, 3);
        eh.setBackOffFunction((rec, ex) -> {
            if (ex.getMessage().contains("No method found for class")) {
                return neverRetryOrBackOff;
            }
            else {
                return normalBackOff;
            }
        });
        return eh;
    }

}

@Component
@KafkaListener(id = "so59256214", topics = "so59256214")
class Listener {

    @KafkaHandler
    void integerHandler(Integer in) {
        System.out.println("int: " + in);
    }

    @KafkaHandler
    void doubleHandler(Double in) {
        System.out.println("double: " + in);
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

结果:

int: 42
double: 42.0
dlt: ConsumerRecord(topic = so59256214.DLT, ...