spring 启动 kafka 通用 JSON templateSender

spring boot kafka generic JSON templateSender

我正在使用 kafka 和 spring 启动,我需要将 JSON 对象发送到 kafka,关键是我能够发送一个对象作为 JSON 配置 KafkaTemplate但仅针对此对象。

package com.bankia.apimanager.config;

import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, RequestDTO> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, RequestDTO> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
package com.bankia.apimanager.controller;

import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController {

    private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );

    private static final String TOPIC = "test";

    @Autowired
    private KafkaTemplate<String, RequestDTO> sender;

    @RequestMapping(value = "/test", method = RequestMethod.GET)
    public String postMessage(){

        ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
        future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>() {
            @Override
            public void onSuccess(SendResult<String, RequestDTO> result) {
                LOG.info("Sent message with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Unable to send message due to : " + ex.getMessage());
            }
        });
        return "OK";
    }
}

但是如果现在我想发送一个新的 DTO 对象呢?我是否必须声明一个新的 KafkaTemplate<String,NEWOBJECT> 并自动装配在每个对象的配置中声明的每个 kafka 模板?还有另一种方法可以只声明一个 kafkaTemplate,我可以在其中发送任何类型的对象并自动在 JSON?

中序列化

我认为,您可以指定一个泛型 KafkaTemplate<String, Object> 并将生产者值序列化程序设置为 JsonSerializer,如下所示:

@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

引用您的代码:

  • Value Serializer 被正确定义为 JsonSerializer,它将任何类型的对象转换为 JSON。
@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
}
  • 在 KafkaConfig & Controller 的每个地方将 更改为

Keep in mind that generics remain until compile time (type erasure) only.

有两种情况:

场景 #1

如果你想使用 KafkaTemplate 将任何类型(如你的问题中提到的)发送到 kafka,那么就不需要声明你自己的 KafkaTemplate bean 因为 Spring boot 在 KafkaAutoConfiguration.

中为您完成了此操作
package org.springframework.boot.autoconfigure.kafka;

...

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener,
            ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }
}


**Some Note**:

  1. 此配置 class 已使用 @ConditionalOnClass(KafkaTemplate.class) 注释,这意味着:(来自 spring 文档--->)@Conditional 仅在指定时匹配classes 在 class 路径上。

  2. kafkaTemplate bean 方法注释为 @ConditionalOnMissingBean(KafkaTemplate.class) 这意味着:(来自 spring 文档 ---->)@Conditional 仅当 BeanFactory 中已不包含满足指定要求的 bean 时才匹配。

  3. 重要! 在纯 java 世界中,KafkaTemplate<?, ?> 不是子类型,例如:KafkaTemplate<String, RequestDTO> 所以你不能这样做:

KafkaTemplate<?, ?> kf1 = ...;
KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error

因为 java 参数化类型是不变的,如 Effective Java 第三版第 31 条所述。但是 spring world 没问题,将被注入到您自己的服务中。您只需要在 kafkaTemplate 属性上指定您自己的通用类型。 例如:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate1;
    @Autowired
    private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;
}

场景#2

如果您需要限制 kafka 记录的值类型,那么您需要像这样指定您自己的 kafka bean:


@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(CorridorTracingConfiguration.class)
public class CorridorKafkaAutoConfiguration {
    
    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
                                                              ProducerListener<Object, AbstractMessage> kafkaProducerListener,
                                                              ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

现在只能注入 KafkaTemplate<String, AbstractMessage> kafkaTemplate,密钥类型可以是任何其他而不是 String。但是你可以通过它发送 AbstractMessage 的任何子类型到 kafka。

用法示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    @Autowired
    private KafkaTemplate<String, AbstractMessage> kafkaTemplate;

    public void makeTrx(TrxRequest trxRequest) {
        kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
    }


}

@Accessors(chain = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class FraudRequest extends AbstractMessage {
    private float amount;
    private String fromAccountNumber;
    private String toAccountNumber;

...

}

限制kafka消息的key跟上面的一样