我们可以在 spring 引导中使用多个 kafka 模板吗?
Can we use multiple kafka template in spring boot?
在我的 spring boot kafka publisher 应用程序中,我想支持以 String(json) 或字节格式发布消息,因为我想为这两种格式提供支持 json 和 avro。但是 spring 引导中的 Kafka 模板让我们只定义其中一个模板。有没有办法同时使用这两种模板或任何其他方式来为 json 和 avro 提供支持?
KafkaTemplate<String, String>
仅适用于字符串,但我也想发布 avro,它应该类似于 KafkaTemplate<String, byte[]>
您可以尝试使用不同的配置创建 KafkaTemplate:
@Bean
public ProducerFactory<String, String> producerFactoryString() {
Map<String, Object> configProps = new HashMap<>();
//additional config parameters ....
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, byte[]> producerFactoryByte() {
Map<String, Object> configProps = new HashMap<>();
//additional config parameters ....
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplateByte() {
return new KafkaTemplate<>(producerFactoryByte());
}
您可以创建 Kafka 配置。我不得不将数据发送到 2 个不同的服务器。
@Configuration
public class KafkaConfig {
private final MosaicKafkaConfig mosaicKafkaConfig;
private final StreamKafkaConfig streamKafkaConfig;
public KafkaConfig(MosaicKafkaConfig mosaicKafkaConfig, StreamKafkaConfig streamKafkaConfig) {
this.mosaicKafkaConfig = mosaicKafkaConfig;
this.streamKafkaConfig = streamKafkaConfig;
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactoryForMosaic() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(mosaicKafkaConfig.getSslTrustStoreLocation());
ssl.setTrustStoreLocation(resource);
ssl.setTrustStorePassword(mosaicKafkaConfig.getSslTrustStorePassword());
ssl.setTrustStoreType(mosaicKafkaConfig.getSslTrustStoreType());
Map<String, String> props = kafkaProperties.getProperties();
props.put("sasl.jaas.config", mosaicKafkaConfig.getSaslConfig());
props.put("sasl.mechanism", mosaicKafkaConfig.getSaslMechanism());
props.put("security.protocol", mosaicKafkaConfig.getSaslSecProtocol());
kafkaProperties.getProducer().setValueSerializer(mosaicKafkaConfig.getValaueSerializer());
kafkaProperties.getProducer().setClientId(mosaicKafkaConfig.getClientID());
kafkaProperties.getProducer().setBootstrapServers(mosaicKafkaConfig.getBootstrapServers());
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplateForMosaic(ProducerFactory<Object, Object> kafkaProducerFactoryForMosaic) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForMosaic);
return kafkaTemplate;
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactoryForStream() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(streamKafkaConfig.getSslTrustStoreLocation());
ssl.setTrustStoreLocation(resource);
ssl.setTrustStorePassword(streamKafkaConfig.getSslTrustStorePassword());
ssl.setTrustStoreType(streamKafkaConfig.getSslTrustStoreType());
Map<String, String> props = kafkaProperties.getProperties();
props.put("sasl.jaas.config", streamKafkaConfig.getSaslConfig());
props.put("sasl.mechanism", streamKafkaConfig.getSaslMechanism());
props.put("security.protocol", streamKafkaConfig.getSaslSecProtocol());
kafkaProperties.getProducer().setValueSerializer(streamKafkaConfig.getValaueSerializer());
kafkaProperties.getProducer().setClientId(streamKafkaConfig.getClientID());
kafkaProperties.getProducer().setBootstrapServers(streamKafkaConfig.getBootstrapServers());
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplateForStream(ProducerFactory<Object, Object> kafkaProducerFactoryForStream) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForStream);
return kafkaTemplate;
}
}
在我的 spring boot kafka publisher 应用程序中,我想支持以 String(json) 或字节格式发布消息,因为我想为这两种格式提供支持 json 和 avro。但是 spring 引导中的 Kafka 模板让我们只定义其中一个模板。有没有办法同时使用这两种模板或任何其他方式来为 json 和 avro 提供支持?
KafkaTemplate<String, String>
仅适用于字符串,但我也想发布 avro,它应该类似于 KafkaTemplate<String, byte[]>
您可以尝试使用不同的配置创建 KafkaTemplate:
@Bean
public ProducerFactory<String, String> producerFactoryString() {
Map<String, Object> configProps = new HashMap<>();
//additional config parameters ....
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, byte[]> producerFactoryByte() {
Map<String, Object> configProps = new HashMap<>();
//additional config parameters ....
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplateByte() {
return new KafkaTemplate<>(producerFactoryByte());
}
您可以创建 Kafka 配置。我不得不将数据发送到 2 个不同的服务器。
@Configuration
public class KafkaConfig {
private final MosaicKafkaConfig mosaicKafkaConfig;
private final StreamKafkaConfig streamKafkaConfig;
public KafkaConfig(MosaicKafkaConfig mosaicKafkaConfig, StreamKafkaConfig streamKafkaConfig) {
this.mosaicKafkaConfig = mosaicKafkaConfig;
this.streamKafkaConfig = streamKafkaConfig;
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactoryForMosaic() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(mosaicKafkaConfig.getSslTrustStoreLocation());
ssl.setTrustStoreLocation(resource);
ssl.setTrustStorePassword(mosaicKafkaConfig.getSslTrustStorePassword());
ssl.setTrustStoreType(mosaicKafkaConfig.getSslTrustStoreType());
Map<String, String> props = kafkaProperties.getProperties();
props.put("sasl.jaas.config", mosaicKafkaConfig.getSaslConfig());
props.put("sasl.mechanism", mosaicKafkaConfig.getSaslMechanism());
props.put("security.protocol", mosaicKafkaConfig.getSaslSecProtocol());
kafkaProperties.getProducer().setValueSerializer(mosaicKafkaConfig.getValaueSerializer());
kafkaProperties.getProducer().setClientId(mosaicKafkaConfig.getClientID());
kafkaProperties.getProducer().setBootstrapServers(mosaicKafkaConfig.getBootstrapServers());
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplateForMosaic(ProducerFactory<Object, Object> kafkaProducerFactoryForMosaic) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForMosaic);
return kafkaTemplate;
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactoryForStream() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(streamKafkaConfig.getSslTrustStoreLocation());
ssl.setTrustStoreLocation(resource);
ssl.setTrustStorePassword(streamKafkaConfig.getSslTrustStorePassword());
ssl.setTrustStoreType(streamKafkaConfig.getSslTrustStoreType());
Map<String, String> props = kafkaProperties.getProperties();
props.put("sasl.jaas.config", streamKafkaConfig.getSaslConfig());
props.put("sasl.mechanism", streamKafkaConfig.getSaslMechanism());
props.put("security.protocol", streamKafkaConfig.getSaslSecProtocol());
kafkaProperties.getProducer().setValueSerializer(streamKafkaConfig.getValaueSerializer());
kafkaProperties.getProducer().setClientId(streamKafkaConfig.getClientID());
kafkaProperties.getProducer().setBootstrapServers(streamKafkaConfig.getBootstrapServers());
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplateForStream(ProducerFactory<Object, Object> kafkaProducerFactoryForStream) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForStream);
return kafkaTemplate;
}
}