在一个应用程序中使用多个不同的 kafka 集群
using multiple different kafka cluster within one app
这可能不是典型的设置,但由于更高的决策,我们最终在一个应用程序中有多个 kafka 集群,每个集群有多个主题,并且每个集群可能有不同的序列化策略。 Json/avro。 avro 可能使用融合模式注册表或使用单一对象编码。
好吧,我通过构建我自己的抽象和注册表来分析配置并手动创建大部分内容,但我觉得我需要重复主题名称、模式注册表等内容 url多个地方多次只是为了创建所有需要的 beans。丑死了。
我想问一下,是否有更好的方法和支持,我可能忽略了。
我需要创建 N 个表示的 kafka 集群,配置一次。针对给定的 kafka 集群配置主题,在适用的情况下为主题配置融合模式注册表等,以便我可以创建 Avro 模式文件的实例,将其发送到 KafkaTemplate,它就会工作。
这取决于配置的复杂性和差异程度,至于这是否有帮助,但您可以在 @KafkaListener
和每个 KafkaTemplate
.
例如
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
编辑
可以外部化覆盖 - 像这样:
@KafkaListener(id = "so67959209", topics = "so67959209",
properties = "${consumer.overrides.one}")
public void listen(byte[] in) {
}
和
consumer.overrides.one=bootstrap.servers:localhost:9092\n \
key.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer\n \
value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer
重要 覆盖是原始 Kafka 属性 名称,而不是引导版本 - 例如bootstrap.servers
比。靴子的 bootstrap-servers
.
这可能不是典型的设置,但由于更高的决策,我们最终在一个应用程序中有多个 kafka 集群,每个集群有多个主题,并且每个集群可能有不同的序列化策略。 Json/avro。 avro 可能使用融合模式注册表或使用单一对象编码。
好吧,我通过构建我自己的抽象和注册表来分析配置并手动创建大部分内容,但我觉得我需要重复主题名称、模式注册表等内容 url多个地方多次只是为了创建所有需要的 beans。丑死了。
我想问一下,是否有更好的方法和支持,我可能忽略了。
我需要创建 N 个表示的 kafka 集群,配置一次。针对给定的 kafka 集群配置主题,在适用的情况下为主题配置融合模式注册表等,以便我可以创建 Avro 模式文件的实例,将其发送到 KafkaTemplate,它就会工作。
这取决于配置的复杂性和差异程度,至于这是否有帮助,但您可以在 @KafkaListener
和每个 KafkaTemplate
.
例如
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
编辑
可以外部化覆盖 - 像这样:
@KafkaListener(id = "so67959209", topics = "so67959209",
properties = "${consumer.overrides.one}")
public void listen(byte[] in) {
}
和
consumer.overrides.one=bootstrap.servers:localhost:9092\n \
key.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer\n \
value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer
重要 覆盖是原始 Kafka 属性 名称,而不是引导版本 - 例如bootstrap.servers
比。靴子的 bootstrap-servers
.