将 json 负载发送到 Spring 中的 Apache Kafka 主题
Send a json payload to Apache Kafka topic in Spring
我需要将 (Post) Json 有效载荷发送到 Apache Kafka 主题,但我收到以下错误:-
"message": "Can't convert value of class com.xyz.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer"
同时 Spring 显示 class 转换异常:-
java.lang.ClassCastException: com.xyz.User 无法转换为 java.lang.String
以下是我的模态、kafka 配置和控制器
public class User {
private String firstname;
private String email;
public User() {}
public User(String firstname, String email) {
super();
this.firstname = firstname;
this.email = email;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "UserModel [firstname=" + firstname + ", email=" + email + "]";
}
}
卡夫卡配置
package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.xyz.User;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
控制器
@RestController
@RequestMapping("/kafka")
public class UserController {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, user);
Json 邮递员发送的负载
{
"firstname" : "xyz",
"email" : "xyz@gmail.com.com"
}
以下对我有用。
public class UserController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = null;
try {
json = ow.writeValueAsString(user);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaTemplate.send(TOPIC, json);
}
能够在我的本地设置中重现您的错误。
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String
如果我们查看日志,它会显示 value.serializer
值仍然指的是默认值 StringSerializer
而不是预期值 JsonSerializer
,这反过来说明您的生产者配置没有生效。简而言之,您的 KafkaCOnfiguration
class 没有被推荐。
您的 KafkaConfiguration
class 在某些 config
包中,User
class 在某些 com.xyz 包中。所以解决方案是确保它获取您的配置。很可能该包可能不会扫描 configuration/beans 定义。如果您将 KafkaConfiguration 移动到应用程序的根包,那么您的原始代码应该可以正常工作。
如果您说您的 KafkaTemplate 对象被注入了,那么实际上并没有。被注入的那个由 Spring kafka Autoconfiguration 定义。
服务
@Service
public class KafkaSenderService {
@Autowired
private KafkaTemplate<String, JsonNode> kafkaTemplate;
public void send(String kafkaTopic, JsonNode message) {
kafkaTemplate.send(kafkaTopic, message);
}
}
控制器
for (JsonNode data: datas) {
kafkaSender.send(topicName,data);
}
application.properties
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
我需要将 (Post) Json 有效载荷发送到 Apache Kafka 主题,但我收到以下错误:- "message": "Can't convert value of class com.xyz.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer"
同时 Spring 显示 class 转换异常:- java.lang.ClassCastException: com.xyz.User 无法转换为 java.lang.String
以下是我的模态、kafka 配置和控制器
public class User {
private String firstname;
private String email;
public User() {}
public User(String firstname, String email) {
super();
this.firstname = firstname;
this.email = email;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "UserModel [firstname=" + firstname + ", email=" + email + "]";
}
}
卡夫卡配置
package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.xyz.User;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
控制器
@RestController
@RequestMapping("/kafka")
public class UserController {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, user);
Json 邮递员发送的负载
{
"firstname" : "xyz",
"email" : "xyz@gmail.com.com"
}
以下对我有用。
public class UserController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = null;
try {
json = ow.writeValueAsString(user);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaTemplate.send(TOPIC, json);
}
能够在我的本地设置中重现您的错误。
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String
如果我们查看日志,它会显示 value.serializer
值仍然指的是默认值 StringSerializer
而不是预期值 JsonSerializer
,这反过来说明您的生产者配置没有生效。简而言之,您的 KafkaCOnfiguration
class 没有被推荐。
您的 KafkaConfiguration
class 在某些 config
包中,User
class 在某些 com.xyz 包中。所以解决方案是确保它获取您的配置。很可能该包可能不会扫描 configuration/beans 定义。如果您将 KafkaConfiguration 移动到应用程序的根包,那么您的原始代码应该可以正常工作。
如果您说您的 KafkaTemplate 对象被注入了,那么实际上并没有。被注入的那个由 Spring kafka Autoconfiguration 定义。
服务
@Service
public class KafkaSenderService {
@Autowired
private KafkaTemplate<String, JsonNode> kafkaTemplate;
public void send(String kafkaTopic, JsonNode message) {
kafkaTemplate.send(kafkaTopic, message);
}
}
控制器
for (JsonNode data: datas) {
kafkaSender.send(topicName,data);
}
application.properties
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer