Micronaut kafka 项目 - 多个消费者,每个消费者都有不同的 bootstrap 服务器和 ssl 证书
Micronaut kafka project - mutiple consumers each with different bootstrap server and ssl certs
我正在尝试建立一个包含多个消费者的 micronaut 项目,每个消费者都有不同的 bootstrap 服务器和 ssl 证书。我没有设置全局 bootstrap 服务器和证书。这是行不通的。任何建议表示赞赏。
另一种选择是将证书合并到一个 jks 文件中并设置全局 bootstrap 和 ssl 配置。
kafka:
consumers:
group1:
bootstrap:
servers: $someserver1
ssl:
keystore:
location: /keystore.jks
password: password
truststore:
location: /truststore.jks
password: password
type: PKCS12
security:
protocol: ssl
group2:
bootstrap:
servers: $someserver2
ssl:
keystore:
location: /keystore-1.jks
password: password
truststore:
location: /truststore-1.jks
password: password
type: PKCS12
security:
protocol: ssl
以上配置工作正常,但您需要禁用 kafka 健康检查或提供组合 ssl 证书,否则 micronaut kafka 健康检查失败。 1.2.7 版本的 micronaut 也是如此。
kafka:
health.enabled: false
consumers:
group1:
bootstrap:
servers: $someserver1
ssl:
keystore:
location: /keystore.jks
password: password
truststore:
location: /truststore.jks
password: password
type: PKCS12
security:
protocol: ssl
group2:
bootstrap:
servers: $someserver2
ssl:
keystore:
location: /keystore-1.jks
password: password
truststore:
location: /truststore-1.jks
password: password
type: PKCS12
security:
protocol: ssl
问题是消费者 类 没有被正确注释。这是对我有用的:
在application.yml
中(注意两台服务器使用相同的SSL证书):
kafka:
consumers:
group1:
topic: some-topic-1
bootstrap:
servers: server-1:9092
group2:
topic: some-topic-2
bootstrap:
servers: server-2:9092
ssl:
keystore:
location: /keystore.jks
password: "password1"
truststore:
location: /truststore.jks
password: "password2"
security:
protocol: ssl
然后,注释您的 类 以设置特定于消费者的属性。 Kotlin 中的示例:
@KafkaListener(
properties = [Property(name = ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, value = "${kafka.consumers.group1.bootstrap.servers}")],
)
class TestConsumer1() {
@Topic("${kafka.consumers.group1.topic}")
fun receiveMessage(record: ConsumerRecord<String, String>, acknowledgement: Acknowledgement) {
TODO("Handle events from server-1:9092 and some-topic-1 :)")
}
}
有关 @KafkaListener
和 properties
的更多信息可在此处找到:https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerConfiguration
我正在尝试建立一个包含多个消费者的 micronaut 项目,每个消费者都有不同的 bootstrap 服务器和 ssl 证书。我没有设置全局 bootstrap 服务器和证书。这是行不通的。任何建议表示赞赏。
另一种选择是将证书合并到一个 jks 文件中并设置全局 bootstrap 和 ssl 配置。
kafka:
consumers:
group1:
bootstrap:
servers: $someserver1
ssl:
keystore:
location: /keystore.jks
password: password
truststore:
location: /truststore.jks
password: password
type: PKCS12
security:
protocol: ssl
group2:
bootstrap:
servers: $someserver2
ssl:
keystore:
location: /keystore-1.jks
password: password
truststore:
location: /truststore-1.jks
password: password
type: PKCS12
security:
protocol: ssl
以上配置工作正常,但您需要禁用 kafka 健康检查或提供组合 ssl 证书,否则 micronaut kafka 健康检查失败。 1.2.7 版本的 micronaut 也是如此。
kafka:
health.enabled: false
consumers:
group1:
bootstrap:
servers: $someserver1
ssl:
keystore:
location: /keystore.jks
password: password
truststore:
location: /truststore.jks
password: password
type: PKCS12
security:
protocol: ssl
group2:
bootstrap:
servers: $someserver2
ssl:
keystore:
location: /keystore-1.jks
password: password
truststore:
location: /truststore-1.jks
password: password
type: PKCS12
security:
protocol: ssl
问题是消费者 类 没有被正确注释。这是对我有用的:
在application.yml
中(注意两台服务器使用相同的SSL证书):
kafka:
consumers:
group1:
topic: some-topic-1
bootstrap:
servers: server-1:9092
group2:
topic: some-topic-2
bootstrap:
servers: server-2:9092
ssl:
keystore:
location: /keystore.jks
password: "password1"
truststore:
location: /truststore.jks
password: "password2"
security:
protocol: ssl
然后,注释您的 类 以设置特定于消费者的属性。 Kotlin 中的示例:
@KafkaListener(
properties = [Property(name = ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, value = "${kafka.consumers.group1.bootstrap.servers}")],
)
class TestConsumer1() {
@Topic("${kafka.consumers.group1.topic}")
fun receiveMessage(record: ConsumerRecord<String, String>, acknowledgement: Acknowledgement) {
TODO("Handle events from server-1:9092 and some-topic-1 :)")
}
}
有关 @KafkaListener
和 properties
的更多信息可在此处找到:https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerConfiguration