Kafka 序列化程序与 DeadLetterPublishingRecoverer 中的 Spring 消息转换器

Kafka Serializers vs Spring Message Converters in DeadLetterPublishingRecoverer

我对spring-kafka中的序列化器和消息转换器有点困惑。什么时候应该使用 Spring 消息转换器,什么时候只使用 Kafka 序列化器?据我在 Spring 中看到的,首选方法是使用 StringSerializer 配置 Kafka 客户端 key/value 序列化程序,然后在 KafkaTemplate 上配置消息转换器以用于实际POJO 到字符串的转换。对吗?

我一直在尝试配置一个 DeadLetterPublishingRecoverer,它应该向 DLT 发送消息以解决反序列化错误以及处理反序列化消息时的任何错误。问题是当消息已经被反序列化时我需要一个 JsonSerializer 但当消息无法被反序列化时我只需要一个简单的 StringSerializer 。有什么配置方法吗?

模板中的消息转换仅适用于采用 Message<?>.

send() 方法

使用采用模板映射的 DLPR 构造函数之一:

    /**
     * Create an instance with the provided templates and a default destination resolving
     * function that returns a TopicPartition based on the original topic (appended with
     * ".DLT") from the failed record, and the same partition as the failed record.
     * Therefore the dead-letter topic must have at least as many partitions as the
     * original topic. The templates map keys are classes and the value the corresponding
     * template to use for objects (producer record values) of that type. A
     * {@link java.util.LinkedHashMap} is recommended when there is more than one
     * template, to ensure the map is traversed in order. To send records with a null
     * value, add a template with the {@link Void} class as a key; otherwise the first
     * template from the map values iterator will be used.
     * @param templates the {@link KafkaOperations}s to use for publishing.
     */
    public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {
        this(templates, DEFAULT_DESTINATION_RESOLVER);
    }

为反序列化异常添加一个为 byte[] 值配置的模板(在其生产者上有一个 ByteArraySerializer)。