在 spring 启动时创建 KafkaTemplate 的正确方法
The correct way for creation of KafkaTemplate in spring boot
我尝试在 spring boot 应用程序中配置 apache kafka。我读了这个 documentation 并按照以下步骤操作:
1) 我将此行添加到 aplication.yaml
:
spring:
kafka:
bootstrap-servers: kafka_host:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
2) 我创建了新主题:
@Bean
public NewTopic responseTopic() {
return new NewTopic("new-topic", 5, (short) 1);
}
现在我想使用 KafkaTemplate
:
private final KafkaTemplate<String, byte[]> kafkaTemplate;
public KafkaEventBus(KafkaTemplate<String, byte[]> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
但 Intellij IDE 亮点:
要解决这个问题,我需要创建 bean:
@Bean
public KafkaTemplate<String, byte[]> myMessageKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
并传递给构造函数属性 greetingProducerFactory()
:
@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_hist4:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
但是如果我需要创建 ProducerFactory 手册,那么在 application.yaml 中设置有什么意义?
默认情况下 KafkaTemplate<Object, Object>
由 Spring 在 KafkaAutoConfiguration
class 中启动创建。由于 Spring 在依赖注入期间考虑通用类型信息,因此无法将默认 bean 自动装配到 KafkaTemplate<String, byte[]>
.
我认为您可以安全地忽略 IDEA 的警告;我在使用不同泛型类型的 Boot 模板中连接没有问题...
@SpringBootApplication
public class So55280173Application {
public static void main(String[] args) {
SpringApplication.run(So55280173Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, Foo foo) {
return args -> {
template.send("so55280173", "foo");
if (foo.template == template) {
System.out.println("they are the same");
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("so55280173", 1, (short) 1);
}
}
@Component
class Foo {
final KafkaTemplate<String, String> template;
@Autowired
Foo(KafkaTemplate<String, String> template) {
this.template = template;
}
}
和
they are the same
我最初遇到了同样的问题,但是当我执行它时没有出现任何错误并且运行良好。
忽略 Intellij IDEA 的警告,这可能是 IDEA 的一个错误,用于确定自动装配的组件。
我尝试在 spring boot 应用程序中配置 apache kafka。我读了这个 documentation 并按照以下步骤操作:
1) 我将此行添加到 aplication.yaml
:
spring:
kafka:
bootstrap-servers: kafka_host:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
2) 我创建了新主题:
@Bean
public NewTopic responseTopic() {
return new NewTopic("new-topic", 5, (short) 1);
}
现在我想使用 KafkaTemplate
:
private final KafkaTemplate<String, byte[]> kafkaTemplate;
public KafkaEventBus(KafkaTemplate<String, byte[]> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
但 Intellij IDE 亮点:
要解决这个问题,我需要创建 bean:
@Bean
public KafkaTemplate<String, byte[]> myMessageKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
并传递给构造函数属性 greetingProducerFactory()
:
@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_hist4:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
但是如果我需要创建 ProducerFactory 手册,那么在 application.yaml 中设置有什么意义?
默认情况下 KafkaTemplate<Object, Object>
由 Spring 在 KafkaAutoConfiguration
class 中启动创建。由于 Spring 在依赖注入期间考虑通用类型信息,因此无法将默认 bean 自动装配到 KafkaTemplate<String, byte[]>
.
我认为您可以安全地忽略 IDEA 的警告;我在使用不同泛型类型的 Boot 模板中连接没有问题...
@SpringBootApplication
public class So55280173Application {
public static void main(String[] args) {
SpringApplication.run(So55280173Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, Foo foo) {
return args -> {
template.send("so55280173", "foo");
if (foo.template == template) {
System.out.println("they are the same");
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("so55280173", 1, (short) 1);
}
}
@Component
class Foo {
final KafkaTemplate<String, String> template;
@Autowired
Foo(KafkaTemplate<String, String> template) {
this.template = template;
}
}
和
they are the same
我最初遇到了同样的问题,但是当我执行它时没有出现任何错误并且运行良好。
忽略 Intellij IDEA 的警告,这可能是 IDEA 的一个错误,用于确定自动装配的组件。