Kafka 序列化程序与 DeadLetterPublishingRecoverer 中的 Spring 消息转换器
Kafka Serializers vs Spring Message Converters in DeadLetterPublishingRecoverer
我对spring-kafka中的序列化器和消息转换器有点困惑。什么时候应该使用 Spring 消息转换器,什么时候只使用 Kafka 序列化器?据我在 Spring 中看到的,首选方法是使用 StringSerializer
配置 Kafka 客户端 key/value 序列化程序,然后在 KafkaTemplate
上配置消息转换器以用于实际POJO 到字符串的转换。对吗?
我一直在尝试配置一个 DeadLetterPublishingRecoverer
,它应该向 DLT 发送消息以解决反序列化错误以及处理反序列化消息时的任何错误。问题是当消息已经被反序列化时我需要一个 JsonSerializer
但当消息无法被反序列化时我只需要一个简单的 StringSerializer
。有什么配置方法吗?
模板中的消息转换仅适用于采用 Message<?>
.
的 send()
方法
使用采用模板映射的 DLPR 构造函数之一:
/**
* Create an instance with the provided templates and a default destination resolving
* function that returns a TopicPartition based on the original topic (appended with
* ".DLT") from the failed record, and the same partition as the failed record.
* Therefore the dead-letter topic must have at least as many partitions as the
* original topic. The templates map keys are classes and the value the corresponding
* template to use for objects (producer record values) of that type. A
* {@link java.util.LinkedHashMap} is recommended when there is more than one
* template, to ensure the map is traversed in order. To send records with a null
* value, add a template with the {@link Void} class as a key; otherwise the first
* template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
*/
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {
this(templates, DEFAULT_DESTINATION_RESOLVER);
}
为反序列化异常添加一个为 byte[]
值配置的模板(在其生产者上有一个 ByteArraySerializer
)。
我对spring-kafka中的序列化器和消息转换器有点困惑。什么时候应该使用 Spring 消息转换器,什么时候只使用 Kafka 序列化器?据我在 Spring 中看到的,首选方法是使用 StringSerializer
配置 Kafka 客户端 key/value 序列化程序,然后在 KafkaTemplate
上配置消息转换器以用于实际POJO 到字符串的转换。对吗?
我一直在尝试配置一个 DeadLetterPublishingRecoverer
,它应该向 DLT 发送消息以解决反序列化错误以及处理反序列化消息时的任何错误。问题是当消息已经被反序列化时我需要一个 JsonSerializer
但当消息无法被反序列化时我只需要一个简单的 StringSerializer
。有什么配置方法吗?
模板中的消息转换仅适用于采用 Message<?>
.
send()
方法
使用采用模板映射的 DLPR 构造函数之一:
/**
* Create an instance with the provided templates and a default destination resolving
* function that returns a TopicPartition based on the original topic (appended with
* ".DLT") from the failed record, and the same partition as the failed record.
* Therefore the dead-letter topic must have at least as many partitions as the
* original topic. The templates map keys are classes and the value the corresponding
* template to use for objects (producer record values) of that type. A
* {@link java.util.LinkedHashMap} is recommended when there is more than one
* template, to ensure the map is traversed in order. To send records with a null
* value, add a template with the {@link Void} class as a key; otherwise the first
* template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
*/
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {
this(templates, DEFAULT_DESTINATION_RESOLVER);
}
为反序列化异常添加一个为 byte[]
值配置的模板(在其生产者上有一个 ByteArraySerializer
)。