Micronaut kafka 项目 - 多个消费者,每个消费者都有不同的 bootstrap 服务器和 ssl 证书

Micronaut kafka project - mutiple consumers each with different bootstrap server and ssl certs

我正在尝试建立一个包含多个消费者的 micronaut 项目,每个消费者都有不同的 bootstrap 服务器和 ssl 证书。我没有设置全局 bootstrap 服务器和证书。这是行不通的。任何建议表示赞赏。

另一种选择是将证书合并到一个 jks 文件中并设置全局 bootstrap 和 ssl 配置。

kafka:
  consumers:
    group1:
      bootstrap:
        servers: $someserver1
      ssl:
        keystore:
          location: /keystore.jks
          password: password
        truststore:
          location: /truststore.jks
          password: password
          type: PKCS12
      security:
        protocol: ssl
    group2:
      bootstrap:
        servers: $someserver2
      ssl:
        keystore:
          location: /keystore-1.jks
          password: password
        truststore:
          location: /truststore-1.jks
          password: password
          type: PKCS12
      security:
        protocol: ssl

以上配置工作正常,但您需要禁用 kafka 健康检查或提供组合 ssl 证书,否则 micronaut kafka 健康检查失败。 1.2.7 版本的 micronaut 也是如此。

kafka:
  health.enabled: false
  consumers:
    group1:
      bootstrap:
        servers: $someserver1
      ssl:
        keystore:
          location: /keystore.jks
          password: password
        truststore:
          location: /truststore.jks
          password: password
          type: PKCS12
      security:
        protocol: ssl
    group2:
      bootstrap:
        servers: $someserver2
      ssl:
        keystore:
          location: /keystore-1.jks
          password: password
        truststore:
          location: /truststore-1.jks
          password: password
          type: PKCS12
      security:
        protocol: ssl

问题是消费者 类 没有被正确注释。这是对我有用的:

application.yml中(注意两台服务器使用相同的SSL证书):

kafka:
  consumers:
    group1:
      topic: some-topic-1
      bootstrap:
        servers: server-1:9092
    group2:
      topic: some-topic-2
      bootstrap:
        servers: server-2:9092
  ssl:
    keystore:
      location: /keystore.jks
      password: "password1"
    truststore:
      location: /truststore.jks
      password: "password2"
  security:
    protocol: ssl

然后,注释您的 类 以设置特定于消费者的属性。 Kotlin 中的示例:

@KafkaListener(
    properties = [Property(name = ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, value = "${kafka.consumers.group1.bootstrap.servers}")],
)
class TestConsumer1() {
    @Topic("${kafka.consumers.group1.topic}")
    fun receiveMessage(record: ConsumerRecord<String, String>, acknowledgement: Acknowledgement) {
        TODO("Handle events from server-1:9092 and some-topic-1 :)")
    }
}

有关 @KafkaListenerproperties 的更多信息可在此处找到:https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerConfiguration