spring 启动 kafka 通用 JSON templateSender
spring boot kafka generic JSON templateSender
我正在使用 kafka 和 spring 启动,我需要将 JSON 对象发送到 kafka,关键是我能够发送一个对象作为 JSON 配置 KafkaTemplate但仅针对此对象。
package com.bankia.apimanager.config;
import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, RequestDTO> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, RequestDTO> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.bankia.apimanager.controller;
import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController {
private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );
private static final String TOPIC = "test";
@Autowired
private KafkaTemplate<String, RequestDTO> sender;
@RequestMapping(value = "/test", method = RequestMethod.GET)
public String postMessage(){
ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>() {
@Override
public void onSuccess(SendResult<String, RequestDTO> result) {
LOG.info("Sent message with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
LOG.error("Unable to send message due to : " + ex.getMessage());
}
});
return "OK";
}
}
但是如果现在我想发送一个新的 DTO 对象呢?我是否必须声明一个新的 KafkaTemplate<String,NEWOBJECT>
并自动装配在每个对象的配置中声明的每个 kafka 模板?还有另一种方法可以只声明一个 kafkaTemplate,我可以在其中发送任何类型的对象并自动在 JSON?
中序列化
我认为,您可以指定一个泛型 KafkaTemplate<String, Object>
并将生产者值序列化程序设置为 JsonSerializer
,如下所示:
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
引用您的代码:
- Value Serializer 被正确定义为 JsonSerializer,它将任何类型的对象转换为 JSON。
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
- 在 KafkaConfig & Controller 的每个地方将
更改为 。
Keep in mind that generics remain until compile time (type erasure)
only.
有两种情况:
场景 #1
如果你想使用 KafkaTemplate
将任何类型(如你的问题中提到的)发送到 kafka,那么就不需要声明你自己的 KafkaTemplate
bean 因为 Spring boot 在 KafkaAutoConfiguration
.
中为您完成了此操作
package org.springframework.boot.autoconfigure.kafka;
...
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
}
**Some Note**
:
此配置 class 已使用 @ConditionalOnClass(KafkaTemplate.class)
注释,这意味着:(来自 spring 文档--->)@Conditional 仅在指定时匹配classes 在 class 路径上。
kafkaTemplate bean 方法注释为
@ConditionalOnMissingBean(KafkaTemplate.class)
这意味着:(来自 spring 文档 ---->)@Conditional 仅当 BeanFactory 中已不包含满足指定要求的 bean 时才匹配。
重要! 在纯 java 世界中,KafkaTemplate<?, ?>
不是子类型,例如:KafkaTemplate<String, RequestDTO>
所以你不能这样做:
KafkaTemplate<?, ?> kf1 = ...;
KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error
因为 java 参数化类型是不变的,如 Effective Java 第三版第 31 条所述。但是 spring world 没问题,将被注入到您自己的服务中。您只需要在 kafkaTemplate 属性上指定您自己的通用类型。
例如:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate1;
@Autowired
private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;
}
场景#2
如果您需要限制 kafka 记录的值类型,那么您需要像这样指定您自己的 kafka bean:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(CorridorTracingConfiguration.class)
public class CorridorKafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
ProducerListener<Object, AbstractMessage> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
现在只能注入
KafkaTemplate<String, AbstractMessage> kafkaTemplate
,密钥类型可以是任何其他而不是 String
。但是你可以通过它发送 AbstractMessage
的任何子类型到 kafka。
用法示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<String, AbstractMessage> kafkaTemplate;
public void makeTrx(TrxRequest trxRequest) {
kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
}
}
@Accessors(chain = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class FraudRequest extends AbstractMessage {
private float amount;
private String fromAccountNumber;
private String toAccountNumber;
...
}
限制kafka消息的key跟上面的一样
我正在使用 kafka 和 spring 启动,我需要将 JSON 对象发送到 kafka,关键是我能够发送一个对象作为 JSON 配置 KafkaTemplate但仅针对此对象。
package com.bankia.apimanager.config;
import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, RequestDTO> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, RequestDTO> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.bankia.apimanager.controller;
import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController {
private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );
private static final String TOPIC = "test";
@Autowired
private KafkaTemplate<String, RequestDTO> sender;
@RequestMapping(value = "/test", method = RequestMethod.GET)
public String postMessage(){
ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>() {
@Override
public void onSuccess(SendResult<String, RequestDTO> result) {
LOG.info("Sent message with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
LOG.error("Unable to send message due to : " + ex.getMessage());
}
});
return "OK";
}
}
但是如果现在我想发送一个新的 DTO 对象呢?我是否必须声明一个新的 KafkaTemplate<String,NEWOBJECT>
并自动装配在每个对象的配置中声明的每个 kafka 模板?还有另一种方法可以只声明一个 kafkaTemplate,我可以在其中发送任何类型的对象并自动在 JSON?
我认为,您可以指定一个泛型 KafkaTemplate<String, Object>
并将生产者值序列化程序设置为 JsonSerializer
,如下所示:
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
引用您的代码:
- Value Serializer 被正确定义为 JsonSerializer,它将任何类型的对象转换为 JSON。
@Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; }
- 在 KafkaConfig & Controller 的每个地方将
更改为 。
Keep in mind that generics remain until compile time (type erasure) only.
有两种情况:
场景 #1
如果你想使用 KafkaTemplate
将任何类型(如你的问题中提到的)发送到 kafka,那么就不需要声明你自己的 KafkaTemplate
bean 因为 Spring boot 在 KafkaAutoConfiguration
.
package org.springframework.boot.autoconfigure.kafka;
...
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
}
**Some Note**
:
此配置 class 已使用
@ConditionalOnClass(KafkaTemplate.class)
注释,这意味着:(来自 spring 文档--->)@Conditional 仅在指定时匹配classes 在 class 路径上。kafkaTemplate bean 方法注释为
@ConditionalOnMissingBean(KafkaTemplate.class)
这意味着:(来自 spring 文档 ---->)@Conditional 仅当 BeanFactory 中已不包含满足指定要求的 bean 时才匹配。重要! 在纯 java 世界中,
KafkaTemplate<?, ?>
不是子类型,例如:KafkaTemplate<String, RequestDTO>
所以你不能这样做:
KafkaTemplate<?, ?> kf1 = ...;
KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error
因为 java 参数化类型是不变的,如 Effective Java 第三版第 31 条所述。但是 spring world 没问题,将被注入到您自己的服务中。您只需要在 kafkaTemplate 属性上指定您自己的通用类型。 例如:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate1;
@Autowired
private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;
}
场景#2
如果您需要限制 kafka 记录的值类型,那么您需要像这样指定您自己的 kafka bean:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(CorridorTracingConfiguration.class)
public class CorridorKafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
ProducerListener<Object, AbstractMessage> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
现在只能注入
KafkaTemplate<String, AbstractMessage> kafkaTemplate
,密钥类型可以是任何其他而不是 String
。但是你可以通过它发送 AbstractMessage
的任何子类型到 kafka。
用法示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<String, AbstractMessage> kafkaTemplate;
public void makeTrx(TrxRequest trxRequest) {
kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
}
}
@Accessors(chain = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class FraudRequest extends AbstractMessage {
private float amount;
private String fromAccountNumber;
private String toAccountNumber;
...
}
限制kafka消息的key跟上面的一样