Spring Kafka Consumer 将消息作为 LinkedHashMap 使用,因此自动将 BigDecimal 转换为 double
Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double
我正在使用基于注解的spring kafka监听器来消费kafka消息,代码如下
- 消费员工对象
Class Employee{
private String name;
private String address;
private Object account;
//getters
//setters
}
- 账户对象在运行时决定是储蓄账户还是活期账户等
Class SavingAcc{
private BigDecimal balance;
}
Class CurrentAcc{
private BigDecimal balance;
private BigDecimal limit;
}
- 储蓄和活期账户有 BigDecimal 字段来存储余额。
- 因此,当从 Kafka 生产者发送 Employee 对象时,所有字段都被正确映射并以正确的 BigDecimal 等格式出现
- 但是在另一个服务中使用 Employee 对象时,account 对象显示为 LinkedHashMap 并且 BigDecimal 字段被转换为 Double。这导致了问题。
- 据我了解,主要原因可能是
a) 账户声明为对象类型而不是特定类型
b) 或者应该更具体地提供解串器。 [我已经将 Employee.class 作为 kafka 接收器反序列化器的类型,因此员工字段已正确映射但帐户字段错误]。
@Bean
public ConsumerFactory<String, Employee> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Employee.class));
}
需要有关如何映射或如何正确反序列化帐户字段的帮助。
使用泛型和自定义 JavaType
method。
Class Employee<T> {
private String name;
private String address;
private T account;
//getters
//setters
}
JavaType withCurrent = TypeFactory.defaultInstance().constructParametricType(Employee.class, CurrentAcc.class);
JavaType withSaving = TypeFactory.defaultInstance().constructParametricType(Employee.class, SavingAcc.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) {
// If it's a current account
return withCurrent;
// else
return withSaving;
}
如果您自己构造反序列化器,请使用
deser.setTypeResolver(MyClass::determineType);
配置属性时。
spring.json.value.type.method=com.mycompany.MyCass.determineType
您必须检查数据或 headers(或主题)以确定您想要的类型。
编辑
这是一个完整的例子。在这种情况下,我在 Account
object 中传递了一个类型提示,但另一种方法是在生产者端设置 header。
@SpringBootApplication
public class JacksonApplication {
public static void main(String[] args) {
SpringApplication.run(JacksonApplication.class, args);
}
@Data
public static class Employee<T extends Account> {
private String name;
private T account;
}
@Data
public static abstract class Account {
private final String type;
protected Account(String type) {
this.type = type;
}
}
@Data
public static class CurrentAccount extends Account {
private BigDecimal balance;
private BigDecimal limit;
public CurrentAccount() {
super("C");
}
}
@Data
public static class SavingAccount extends Account {
private BigDecimal balance;
public SavingAccount() {
super("S");
}
}
@KafkaListener(id = "empListener", topics = "employees")
public void listen(Employee<Account> e) {
System.out.println(e);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("employees").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
return args -> {
Employee<CurrentAccount> emp1 = new Employee<>();
emp1.setName("someOneWithACurrentAccount");
CurrentAccount currentAccount = new CurrentAccount();
currentAccount.setBalance(BigDecimal.ONE);
currentAccount.setLimit(BigDecimal.TEN);
emp1.setAccount(currentAccount);
template.send("employees", emp1);
Employee<SavingAccount> emp2 = new Employee<>();
emp2.setName("someOneWithASavingAccount");
SavingAccount savingAccount = new SavingAccount();
savingAccount.setBalance(BigDecimal.ONE);
emp2.setAccount(savingAccount);
template.send("employees", emp2);
};
}
private static final JavaType withCurrent = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, CurrentAccount.class);
private static final JavaType withSaving = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, SavingAccount.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
if (JsonPath.read(new ByteArrayInputStream(data), "$.account.type").equals("C")) {
return withCurrent;
}
else {
return withSaving;
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.JacksonApplication.determineType
结果
JacksonApplication.Employee(name=someOneWithACurrentAccount, account=JacksonApplication.CurrentAccount(balance=1, limit=10))
JacksonApplication.Employee(name=someOneWithASavingAccount, account=JacksonApplication.SavingAccount(balance=1))
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>jackson</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
EDIT2
下面是一个在 header 中传达类型提示的示例...
@SpringBootApplication
public class JacksonApplication {
public static void main(String[] args) {
SpringApplication.run(JacksonApplication.class, args);
}
@Data
public static class Employee<T extends Account> {
private String name;
private T account;
}
@Data
public static abstract class Account {
}
@Data
public static class CurrentAccount extends Account {
private BigDecimal balance;
private BigDecimal limit;
}
@Data
public static class SavingAccount extends Account {
private BigDecimal balance;
}
@KafkaListener(id = "empListener", topics = "employees")
public void listen(Employee<Account> e) {
System.out.println(e);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("employees").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
return args -> {
Employee<CurrentAccount> emp1 = new Employee<>();
emp1.setName("someOneWithACurrentAccount");
CurrentAccount currentAccount = new CurrentAccount();
currentAccount.setBalance(BigDecimal.ONE);
currentAccount.setLimit(BigDecimal.TEN);
emp1.setAccount(currentAccount);
template.send("employees", emp1);
Employee<SavingAccount> emp2 = new Employee<>();
emp2.setName("someOneWithASavingAccount");
SavingAccount savingAccount = new SavingAccount();
savingAccount.setBalance(BigDecimal.ONE);
emp2.setAccount(savingAccount);
template.send("employees", emp2);
};
}
private static final JavaType withCurrent = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, CurrentAccount.class);
private static final JavaType withSaving = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, SavingAccount.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
if (headers.lastHeader("accountType").value()[0] == 'C') {
return withCurrent;
}
else {
return withSaving;
}
}
public static class MySerializer extends JsonSerializer<Employee<?>> {
@Override
public byte[] serialize(String topic, Headers headers, Employee<?> emp) {
headers.add(new RecordHeader("accountType",
new byte[] { (byte) (emp.getAccount() instanceof CurrentAccount ? 'C' : 'S')}));
return super.serialize(topic, headers, emp);
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=com.example.demo2.JacksonApplication.MySerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo2.JacksonApplication.determineType
这个注释解决了我的问题
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,include = JsonTypeInfo.As.PROPERTY,property = "@class")
private T account
它将定义的 class 绑定到字段
我正在使用基于注解的spring kafka监听器来消费kafka消息,代码如下
- 消费员工对象
Class Employee{
private String name;
private String address;
private Object account;
//getters
//setters
}
- 账户对象在运行时决定是储蓄账户还是活期账户等
Class SavingAcc{
private BigDecimal balance;
}
Class CurrentAcc{
private BigDecimal balance;
private BigDecimal limit;
}
- 储蓄和活期账户有 BigDecimal 字段来存储余额。
- 因此,当从 Kafka 生产者发送 Employee 对象时,所有字段都被正确映射并以正确的 BigDecimal 等格式出现
- 但是在另一个服务中使用 Employee 对象时,account 对象显示为 LinkedHashMap 并且 BigDecimal 字段被转换为 Double。这导致了问题。
- 据我了解,主要原因可能是 a) 账户声明为对象类型而不是特定类型 b) 或者应该更具体地提供解串器。 [我已经将 Employee.class 作为 kafka 接收器反序列化器的类型,因此员工字段已正确映射但帐户字段错误]。
@Bean
public ConsumerFactory<String, Employee> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Employee.class));
}
需要有关如何映射或如何正确反序列化帐户字段的帮助。
使用泛型和自定义 JavaType
method。
Class Employee<T> {
private String name;
private String address;
private T account;
//getters
//setters
}
JavaType withCurrent = TypeFactory.defaultInstance().constructParametricType(Employee.class, CurrentAcc.class);
JavaType withSaving = TypeFactory.defaultInstance().constructParametricType(Employee.class, SavingAcc.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) {
// If it's a current account
return withCurrent;
// else
return withSaving;
}
如果您自己构造反序列化器,请使用
deser.setTypeResolver(MyClass::determineType);
配置属性时。
spring.json.value.type.method=com.mycompany.MyCass.determineType
您必须检查数据或 headers(或主题)以确定您想要的类型。
编辑
这是一个完整的例子。在这种情况下,我在 Account
object 中传递了一个类型提示,但另一种方法是在生产者端设置 header。
@SpringBootApplication
public class JacksonApplication {
public static void main(String[] args) {
SpringApplication.run(JacksonApplication.class, args);
}
@Data
public static class Employee<T extends Account> {
private String name;
private T account;
}
@Data
public static abstract class Account {
private final String type;
protected Account(String type) {
this.type = type;
}
}
@Data
public static class CurrentAccount extends Account {
private BigDecimal balance;
private BigDecimal limit;
public CurrentAccount() {
super("C");
}
}
@Data
public static class SavingAccount extends Account {
private BigDecimal balance;
public SavingAccount() {
super("S");
}
}
@KafkaListener(id = "empListener", topics = "employees")
public void listen(Employee<Account> e) {
System.out.println(e);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("employees").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
return args -> {
Employee<CurrentAccount> emp1 = new Employee<>();
emp1.setName("someOneWithACurrentAccount");
CurrentAccount currentAccount = new CurrentAccount();
currentAccount.setBalance(BigDecimal.ONE);
currentAccount.setLimit(BigDecimal.TEN);
emp1.setAccount(currentAccount);
template.send("employees", emp1);
Employee<SavingAccount> emp2 = new Employee<>();
emp2.setName("someOneWithASavingAccount");
SavingAccount savingAccount = new SavingAccount();
savingAccount.setBalance(BigDecimal.ONE);
emp2.setAccount(savingAccount);
template.send("employees", emp2);
};
}
private static final JavaType withCurrent = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, CurrentAccount.class);
private static final JavaType withSaving = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, SavingAccount.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
if (JsonPath.read(new ByteArrayInputStream(data), "$.account.type").equals("C")) {
return withCurrent;
}
else {
return withSaving;
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.JacksonApplication.determineType
结果
JacksonApplication.Employee(name=someOneWithACurrentAccount, account=JacksonApplication.CurrentAccount(balance=1, limit=10))
JacksonApplication.Employee(name=someOneWithASavingAccount, account=JacksonApplication.SavingAccount(balance=1))
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>jackson</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
EDIT2
下面是一个在 header 中传达类型提示的示例...
@SpringBootApplication
public class JacksonApplication {
public static void main(String[] args) {
SpringApplication.run(JacksonApplication.class, args);
}
@Data
public static class Employee<T extends Account> {
private String name;
private T account;
}
@Data
public static abstract class Account {
}
@Data
public static class CurrentAccount extends Account {
private BigDecimal balance;
private BigDecimal limit;
}
@Data
public static class SavingAccount extends Account {
private BigDecimal balance;
}
@KafkaListener(id = "empListener", topics = "employees")
public void listen(Employee<Account> e) {
System.out.println(e);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("employees").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
return args -> {
Employee<CurrentAccount> emp1 = new Employee<>();
emp1.setName("someOneWithACurrentAccount");
CurrentAccount currentAccount = new CurrentAccount();
currentAccount.setBalance(BigDecimal.ONE);
currentAccount.setLimit(BigDecimal.TEN);
emp1.setAccount(currentAccount);
template.send("employees", emp1);
Employee<SavingAccount> emp2 = new Employee<>();
emp2.setName("someOneWithASavingAccount");
SavingAccount savingAccount = new SavingAccount();
savingAccount.setBalance(BigDecimal.ONE);
emp2.setAccount(savingAccount);
template.send("employees", emp2);
};
}
private static final JavaType withCurrent = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, CurrentAccount.class);
private static final JavaType withSaving = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, SavingAccount.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
if (headers.lastHeader("accountType").value()[0] == 'C') {
return withCurrent;
}
else {
return withSaving;
}
}
public static class MySerializer extends JsonSerializer<Employee<?>> {
@Override
public byte[] serialize(String topic, Headers headers, Employee<?> emp) {
headers.add(new RecordHeader("accountType",
new byte[] { (byte) (emp.getAccount() instanceof CurrentAccount ? 'C' : 'S')}));
return super.serialize(topic, headers, emp);
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=com.example.demo2.JacksonApplication.MySerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo2.JacksonApplication.determineType
这个注释解决了我的问题
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,include = JsonTypeInfo.As.PROPERTY,property = "@class")
private T account
它将定义的 class 绑定到字段