spring kafka 中是否有多个生产者的代码示例?
Is there a code sample for multiple producers in spring kafka?
我有一个应用程序可能需要多个生产者。我看到的所有代码示例似乎都支持单个生产者,在应用程序启动期间从应用程序读取配置。如果有多个生产者并且我们想传入不同的生产者配置,Spring 中是否有开箱即用的支持?或者在那种情况下我应该不带 spring 吗?
您可以通过相同的 ProducerFactory
.
创建多个 Producer
实例 (KafkaTemplate
)
如果您需要不同的 Kafka 配置,则需要不同的 ProducerFactory
个实例。
您必须创建两个不同的 ProducerFactory
下面是示例
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> confluentProducerFactory() {
HashMap<String, Object> configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> cloudraProducerFactory() {
HashMap<String, Object> configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9094");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean(name = "confluent")
public KafkaTemplate<String, String> confluentKafkaTemplate() {
return new KafkaTemplate<>(confluentProducerFactory());
}
@Bean(name = "cloudera")
public KafkaTemplate<String, String> clouderaKafkaTemplate() {
return new KafkaTemplate<>(cloudraProducerFactory());
}
}
public class ProducerExample {
@Autowired
@Qualifier("cloudera")
private KafkaTemplate clouderaKafkaTemplate;
@Autowired
@Qualifier("confluent")
private KafkaTemplate confluentKafkaTemplate;
public void send() {
confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
}
}
从 2.5 版开始,您可以根据目标主题名称在运行时使用 RoutingKafkaTemplate select 生产者。
https://docs.spring.io/spring-kafka/reference/html/#routing-template
如果您仍想像往常一样在 application.yaml
中保留您的配置,并尽可能地保持 Java 配置,您可以扩展 KafkaProperties.Producer
.
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-1")
@RequiredArgsConstructor
class FirstProducer extends KafkaProperties.Producer {
private final KafkaProperties common;
@Qualifier("producer-1")
@Bean
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Qualifier("producer-1")
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-2")
@RequiredArgsConstructor
class SecondProducer extends KafkaProperties.Producer {
private final KafkaProperties common;
@Qualifier("producer-2")
@Bean
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Qualifier("producer-2")
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
我有一个应用程序可能需要多个生产者。我看到的所有代码示例似乎都支持单个生产者,在应用程序启动期间从应用程序读取配置。如果有多个生产者并且我们想传入不同的生产者配置,Spring 中是否有开箱即用的支持?或者在那种情况下我应该不带 spring 吗?
您可以通过相同的 ProducerFactory
.
Producer
实例 (KafkaTemplate
)
如果您需要不同的 Kafka 配置,则需要不同的 ProducerFactory
个实例。
您必须创建两个不同的 ProducerFactory
下面是示例
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> confluentProducerFactory() {
HashMap<String, Object> configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> cloudraProducerFactory() {
HashMap<String, Object> configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9094");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean(name = "confluent")
public KafkaTemplate<String, String> confluentKafkaTemplate() {
return new KafkaTemplate<>(confluentProducerFactory());
}
@Bean(name = "cloudera")
public KafkaTemplate<String, String> clouderaKafkaTemplate() {
return new KafkaTemplate<>(cloudraProducerFactory());
}
}
public class ProducerExample {
@Autowired
@Qualifier("cloudera")
private KafkaTemplate clouderaKafkaTemplate;
@Autowired
@Qualifier("confluent")
private KafkaTemplate confluentKafkaTemplate;
public void send() {
confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
}
}
从 2.5 版开始,您可以根据目标主题名称在运行时使用 RoutingKafkaTemplate select 生产者。 https://docs.spring.io/spring-kafka/reference/html/#routing-template
如果您仍想像往常一样在 application.yaml
中保留您的配置,并尽可能地保持 Java 配置,您可以扩展 KafkaProperties.Producer
.
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-1")
@RequiredArgsConstructor
class FirstProducer extends KafkaProperties.Producer {
private final KafkaProperties common;
@Qualifier("producer-1")
@Bean
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Qualifier("producer-1")
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-2")
@RequiredArgsConstructor
class SecondProducer extends KafkaProperties.Producer {
private final KafkaProperties common;
@Qualifier("producer-2")
@Bean
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Qualifier("producer-2")
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}