spring 启动后,Kafka 主题未在远程 kafka 上自动创建(并在本地 kafka 服务器上创建)

Kafka topic not created automatically on remote kafka after spring boot start(and create on local kafka server)

1) 我在我的机器上启动 kafka

2) 我用配置启动我的 spring 启动服务器:

@Bean
public NewTopic MyTopic() {
    return new NewTopic("my-topic", 5, (short) 1);
}

@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, byte[]> unpMessageKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

结果 - 服务器成功启动并在 kafka 中创建 my-topic

但是,如果我尝试在远程服务器上使用远程 kafka 进行操作 - 主题不会创建。

并在日志 spring 中写入:

12:35:09.880 [                  main] [INFO ]    o.a.k.clients.admin.AdminClientConfig: [] AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]

如果我将这个 bean 添加到配置中:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "remote_host:9092");
    return new KafkaAdmin(configs);
}

主题创建成功。

1) 为什么会这样?

2) 我必须创建 KafkaAdmin 吗?为什么不需要本地 Kafka?

EDDIT

我当前的配置:

spring:
  kafka:
    bootstrap-servers: remote:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

@Configuration
public class KafkaTopicConfig {

    @Value("${response.topics.topicName}")
    private String topicName;

    @Bean
    public NewTopic responseTopic() {
        return new NewTopic(topicName, 5, (short) 1);
    }

}

开始后我看到:

bootstrap.servers = [remote:9092]
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536

...

但主题未创建

KafkaAdmin 是 kafka spring 对象,它在您的 spring 上下文中查找 NewTopic 对象并创建它们。如果您没有 KafkaAdmin,则不会进行任何创建。您可以显式创建 KafkaAdmin(如代码片段中所示)或通过 spring kafka 配置属性间接命令其创建。

KafkaAdmin 很好,因为它与应用程序代码主题的生产或消费无关。

编辑

你肯定有问题;我刚测试过...

spring:
  kafka:
    bootstrap-servers: remote:9092

2019-03-21 09:18:18.354  INFO 58301 --- [           main] o.a.k.clients.admin.AdminClientConfig    
        : AdminClientConfig values: 
    bootstrap.servers = [remote:9092]
    ...

Spring 开机会自动为你配置一个KafkaAdmin,但它使用的是application.yml(或application.properties)。参见 Boot properties。向下滚动到 spring.kafka.bootstrap-servers=。这就是它与 localhost 一起工作的原因(这是默认设置)。

您也不需要 ProducerFactory 或模板;启动将从属性中为您创建它们。