Spring Kafka 自定义解串器
Spring Kafka Custom Deserializer
我正在按照此 link 中列出的步骤创建客户解串器。我从 Kafka 收到的消息在 json string.I 之前有纯文本 "log message -" 希望反序列化器忽略此字符串并解析 json 数据。有办法吗?
申请
@SpringBootApplication
public class TransactionauditServiceApplication {
public static void main(String[] args) throws InterruptedException {
new SpringApplicationBuilder(TransactionauditServiceApplication.class).web(false).run(args);
}
@Bean
public MessageListener messageListener() {
return new MessageListener();
}
public static class MessageListener {
@KafkaListener(topics = "ctp_verbose", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload ConciseMessage message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Messasge in group foo: " + message.getStringValue("traceId") + " partion " + partition);
}
}
}
消费者配置
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress:localhost:9092}")
private String bootstrapAddress;
@Value(value = "${groupId:audit}")
private String groupId;
@Bean
public ConsumerFactory<String, ConciseMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ConciseMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
通过写行 new JsonDeserializer<>(ConciseMessage.class)
,你只是告诉 Kafka 你想将消息转换为 ConciseMessage
类型。因此,这并不能使其成为自定义反序列化器。要解决您的问题,您很可能必须想出自己的解串器实现,该解串器具有剥离文本的逻辑 "log message -".
我正在按照此 link 中列出的步骤创建客户解串器。我从 Kafka 收到的消息在 json string.I 之前有纯文本 "log message -" 希望反序列化器忽略此字符串并解析 json 数据。有办法吗?
申请
@SpringBootApplication
public class TransactionauditServiceApplication {
public static void main(String[] args) throws InterruptedException {
new SpringApplicationBuilder(TransactionauditServiceApplication.class).web(false).run(args);
}
@Bean
public MessageListener messageListener() {
return new MessageListener();
}
public static class MessageListener {
@KafkaListener(topics = "ctp_verbose", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload ConciseMessage message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Messasge in group foo: " + message.getStringValue("traceId") + " partion " + partition);
}
}
}
消费者配置
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress:localhost:9092}")
private String bootstrapAddress;
@Value(value = "${groupId:audit}")
private String groupId;
@Bean
public ConsumerFactory<String, ConciseMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ConciseMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
通过写行 new JsonDeserializer<>(ConciseMessage.class)
,你只是告诉 Kafka 你想将消息转换为 ConciseMessage
类型。因此,这并不能使其成为自定义反序列化器。要解决您的问题,您很可能必须想出自己的解串器实现,该解串器具有剥离文本的逻辑 "log message -".