如何在应用程序属性中拥有多个 kafka 消费者组
How to have multiple kafka consumer groups in application properties
我正在使用 spring kafka,并希望在应用程序 yaml 中提到多个消费者组 ID,我可以在我正在收听多个主题的 kafkaListener class 中使用它们。
我在 application.yml 文件中的 kafka 属性现在看起来像这样
kafka:
properties:
topics:
topic1: topic1
topic2: topic2
bootstrap-servers: server1,server2
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 4
consumer:
group-id: mygroupid
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
有没有办法在上面的消费者块中有多个groupID。
或者,我在 spring 代码中的 kafkaListener 中给出了不同的组 ID,如下所示,我不确定如何设置其余的属性,如自动偏移重置、密钥解串器等:
@KafkaListener(topics = "topic1", groupId = "cd1")
public void consumeMessage(String message) throws Exception {
// some code goes here
}
请告诉我如何完成我想做的事情,因为我是 kafka 的新手。
引导仅从 yml 中自动配置一个 DefaultKafkaConsumerFactory
和 DefaultKafkaConsumerFactory
,因此所有属性在所有消费者之间共享。这就是我们添加 groupId
的原因(如果未提供 groupId
,则使用 id
);这是消费者之间最常见的 属性 变化。
当然,您可以使用 属性 占位符,这样 groupId = "${group.one}"
将使用 yml 中的 属性 group.one
。
要更改更基本的东西,例如 serializers/deserializers,如果您使用的是 2.2.4 之前的版本,则需要创建多个工厂和容器工厂。
但是,从版本 2.2.4 开始,您现在可以在 KafkaListener
注释中设置任意 kafka 消费者 属性...
/**
* Kafka consumer properties; they will supersede any properties with the same name
* defined in the consumer factory (if the consumer factory supports property overrides).
* <h3>Supported Syntax</h3>
* <p>The supported syntax for key-value pairs is the same as the
* syntax defined for entries in a Java
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
* <ul>
* <li>{@code key=value}</li>
* <li>{@code key:value}</li>
* <li>{@code key value}</li>
* </ul>
* {@code group.id} and {@code client.id} are ignored.
* @return the properties.
* @since 2.2.4
* @see org.apache.kafka.clients.consumer.ConsumerConfig
* @see #groupId()
* @see #clientIdPrefix()
*/
String[] properties() default {};
请注意,这些属性采用本机 kafka 点格式 (auto.offset.reset
),而不是启动连字符或驼峰式属性。
这是来自 the documentation 的示例:
@KafkaListener(topics = "myTopic", groupId="group", properties= {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
同样,值可以是 属性 占位符。
在生产者方面,您仍然需要多个工厂。
我正在使用 spring kafka,并希望在应用程序 yaml 中提到多个消费者组 ID,我可以在我正在收听多个主题的 kafkaListener class 中使用它们。
我在 application.yml 文件中的 kafka 属性现在看起来像这样
kafka:
properties:
topics:
topic1: topic1
topic2: topic2
bootstrap-servers: server1,server2
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 4
consumer:
group-id: mygroupid
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
有没有办法在上面的消费者块中有多个groupID。
或者,我在 spring 代码中的 kafkaListener 中给出了不同的组 ID,如下所示,我不确定如何设置其余的属性,如自动偏移重置、密钥解串器等:
@KafkaListener(topics = "topic1", groupId = "cd1")
public void consumeMessage(String message) throws Exception {
// some code goes here
}
请告诉我如何完成我想做的事情,因为我是 kafka 的新手。
引导仅从 yml 中自动配置一个 DefaultKafkaConsumerFactory
和 DefaultKafkaConsumerFactory
,因此所有属性在所有消费者之间共享。这就是我们添加 groupId
的原因(如果未提供 groupId
,则使用 id
);这是消费者之间最常见的 属性 变化。
当然,您可以使用 属性 占位符,这样 groupId = "${group.one}"
将使用 yml 中的 属性 group.one
。
要更改更基本的东西,例如 serializers/deserializers,如果您使用的是 2.2.4 之前的版本,则需要创建多个工厂和容器工厂。
但是,从版本 2.2.4 开始,您现在可以在 KafkaListener
注释中设置任意 kafka 消费者 属性...
/**
* Kafka consumer properties; they will supersede any properties with the same name
* defined in the consumer factory (if the consumer factory supports property overrides).
* <h3>Supported Syntax</h3>
* <p>The supported syntax for key-value pairs is the same as the
* syntax defined for entries in a Java
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
* <ul>
* <li>{@code key=value}</li>
* <li>{@code key:value}</li>
* <li>{@code key value}</li>
* </ul>
* {@code group.id} and {@code client.id} are ignored.
* @return the properties.
* @since 2.2.4
* @see org.apache.kafka.clients.consumer.ConsumerConfig
* @see #groupId()
* @see #clientIdPrefix()
*/
String[] properties() default {};
请注意,这些属性采用本机 kafka 点格式 (auto.offset.reset
),而不是启动连字符或驼峰式属性。
这是来自 the documentation 的示例:
@KafkaListener(topics = "myTopic", groupId="group", properties= {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
同样,值可以是 属性 占位符。
在生产者方面,您仍然需要多个工厂。