Spring Kafka JsonDesirialization MessageConversionException 无法解析 class 名称 Class 未找到

Spring Kafka JsonDesirialization MessageConversionException failed to resolve class name Class not found

我有两个服务应该通过 Kafka 进行通信。 让我们调用第一个服务 WriteService 和第二个服务 QueryService.

WriteService 端,我有以下生产者配置。

@Configuration
public class KafkaProducerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

我正在尝试发送 class com.example.project.web.routes.dto.RouteDto

的对象

QueryService端,消费者配置定义如下。

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.groupid}")
    private String serviceGroupId;

    @Value("${spring.kafka.consumer.trusted-packages}")
    private String trustedPackage;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

侦听器具有以下定义。有效负载 class 具有完全限定名称 - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto

@KafkaListener(topics = "${spring.kafka.topics.routes}",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
                               @Payload RouteDto payload) {
        logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
    }

    private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
    }

发送消息时,出现以下错误

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.project.web.routes.dto.RouteDto]; nested exception is java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto

错误很明显。但是我不明白为什么默认情况下它有这种行为。我不希望在不同的服务中有相同的包,这根本没有意义。

我还没有找到一种方法来禁用它并使用提供给侦听器的 class,并用 @Payload

进行注释

如何在不手动配置映射器的情况下解决这个问题?

如果您正在使用 spring-kafka-2.2.x 您可以通过 JsonDeserializer docs[=17= 的重载构造函数禁用默认值 header ]

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default). The following example shows how to do so:

DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

如果使用较低版本,请使用 MessageConverter(您可能会在 spring-kafka-2.1.x 及更高版本中看到此问题)

Spring for Apache Kafka provides a MessageConverter abstraction with the MessagingMessageConverter implementation and its StringJsonMessageConverter and BytesJsonMessageConverter customization. You can inject the MessageConverter into a KafkaTemplate instance directly and by using AbstractKafkaListenerContainerFactory bean definition for the @KafkaListener.containerFactory() property. The following example shows how to do so:

@Bean
 public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
  }

  @KafkaListener(topics = "jsonData",
            containerFactory = "kafkaJsonListenerContainerFactory")
    public void jsonListener(RouteDto dto) {
     ...
   }

注意:只有在方法级别声明了@KafkaListener注解才能实现这种类型推断。使用 class-level @KafkaListener,负载类型用于 select 要调用的 @KafkaHandler 方法,因此在选择方法之前必须已经转换。