ReplyingKafkaTemplate - sendAndReceive序列化问题
ReplyingKafkaTemplate - sendAndReceive serialisation issue
我正在尝试使用 ReplyingKafkaTemplate
到 sendAndReceive
message/response。
在执行的服务中,我使用 ProducerFactory
和 ConcurrentKafkaListenerContainerFactory
的默认实例来创建 ReplyingKafkaTemplate
的新实例,如下所示:
@Service
public class MyService {
private final ReplyingKafkaTemplate<String, MyCommand, MyResult> kafkaTemplate;
public MyService(ProducerFactory<String, MyCommand> pf,
ConcurrentKafkaListenerContainerFactory<String, MyResult> containerFactory) {
var container = containerFactory.createContainer("results topic name");
this.kafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
kafkaTemplate.start();
}
//... other methods omitted as irrelevant...
private Mono<MyResult> sendMessageAndWaitForResult(ProducerRecord<String, MyCommand> producerRecord) {
var requestReplyFuture = kafkaTemplate.sendAndReceive(producerRecord);
return Mono
.fromFuture(requestReplyFuture.completable())
.map(ConsumerRecord::value);
}
private ProducerRecord<String, MyCommand> createProducerRecord(Iterable<RecordHeader> recordHeaders, MyCommand myCommand) {
return new ProducerRecord(
"commands topic name",
null,
"some string id",
myCommand,
recordHeaders);
}
}
MyCommand
和 MyResult
是在 Lombok 的帮助下定义的 POJO。
我得到的错误是:
KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class MyCommand to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer.
我知道我没有定义序列化程序,但通过 Kafka
中的示例,他们只使用 ReplyingKafkaTemplate<String, String, String>
示例,当然他们不需要序列化程序设置。但是我做了但找不到在哪里以及如何做?
谢谢!
编辑 1:多亏了 Garry Russell 的提示,我才开始为反序列化器创建一个 Bean,但不知何故,在遇到一些错误后,我设法让它以这种方式工作:
public DefaultKafkaProducerFactory<String, MyCommand> producerFactory(
KafkaProperties properties,
AvroConverter avroConverter) {
Map<String, Object> props = properties
.buildProducerProperties();
DefaultKafkaProducerFactory pf =
new DefaultKafkaProducerFactory(
props,
new StringSerializer(),
avroConverter.valueSerde().serializer());
return pf;
}
上面应该是一个@Bean但是当我这样做的时候,我得到了下面评论中提到的错误,一些bean的依赖关系形成了一个周期。
所以这个producerFactory
我在创建的时候就通过了ReplyingKafkaTemplate
。
令人费解的是,我不必创建相同的东西来反序列化 MyResponse
?
AvroConverter
是 class 包装 SpecificAvroSerde
.
的创建
我假设您正在使用 Spring Boot;它默认配置 StringSerializer
。
KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class MyCommand to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer.
由于您发送的是 POJO,而不是字符串,因此您必须使用与 StringSerializer
不同的 Serializer
,例如 JsonSerializer
.
见Serialization, Deserialization, and Message Conversion。
使用 Spring 引导,您可以将序列化程序指定为 属性。
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
另请参阅示例
https://github.com/spring-projects/spring-kafka/tree/main/samples
我正在尝试使用 ReplyingKafkaTemplate
到 sendAndReceive
message/response。
在执行的服务中,我使用 ProducerFactory
和 ConcurrentKafkaListenerContainerFactory
的默认实例来创建 ReplyingKafkaTemplate
的新实例,如下所示:
@Service
public class MyService {
private final ReplyingKafkaTemplate<String, MyCommand, MyResult> kafkaTemplate;
public MyService(ProducerFactory<String, MyCommand> pf,
ConcurrentKafkaListenerContainerFactory<String, MyResult> containerFactory) {
var container = containerFactory.createContainer("results topic name");
this.kafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
kafkaTemplate.start();
}
//... other methods omitted as irrelevant...
private Mono<MyResult> sendMessageAndWaitForResult(ProducerRecord<String, MyCommand> producerRecord) {
var requestReplyFuture = kafkaTemplate.sendAndReceive(producerRecord);
return Mono
.fromFuture(requestReplyFuture.completable())
.map(ConsumerRecord::value);
}
private ProducerRecord<String, MyCommand> createProducerRecord(Iterable<RecordHeader> recordHeaders, MyCommand myCommand) {
return new ProducerRecord(
"commands topic name",
null,
"some string id",
myCommand,
recordHeaders);
}
}
MyCommand
和 MyResult
是在 Lombok 的帮助下定义的 POJO。
我得到的错误是:
KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class MyCommand to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer.
我知道我没有定义序列化程序,但通过 Kafka
中的示例,他们只使用 ReplyingKafkaTemplate<String, String, String>
示例,当然他们不需要序列化程序设置。但是我做了但找不到在哪里以及如何做?
谢谢!
编辑 1:多亏了 Garry Russell 的提示,我才开始为反序列化器创建一个 Bean,但不知何故,在遇到一些错误后,我设法让它以这种方式工作:
public DefaultKafkaProducerFactory<String, MyCommand> producerFactory(
KafkaProperties properties,
AvroConverter avroConverter) {
Map<String, Object> props = properties
.buildProducerProperties();
DefaultKafkaProducerFactory pf =
new DefaultKafkaProducerFactory(
props,
new StringSerializer(),
avroConverter.valueSerde().serializer());
return pf;
}
上面应该是一个@Bean但是当我这样做的时候,我得到了下面评论中提到的错误,一些bean的依赖关系形成了一个周期。
所以这个producerFactory
我在创建的时候就通过了ReplyingKafkaTemplate
。
令人费解的是,我不必创建相同的东西来反序列化 MyResponse
?
AvroConverter
是 class 包装 SpecificAvroSerde
.
我假设您正在使用 Spring Boot;它默认配置 StringSerializer
。
KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class MyCommand to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer.
由于您发送的是 POJO,而不是字符串,因此您必须使用与 StringSerializer
不同的 Serializer
,例如 JsonSerializer
.
见Serialization, Deserialization, and Message Conversion。
使用 Spring 引导,您可以将序列化程序指定为 属性。
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
另请参阅示例
https://github.com/spring-projects/spring-kafka/tree/main/samples