@KafkaListener 与 ConsumerFactory groupId
@KafkaListener vs ConsumerFactory groupId
我遵循了 baeldung.com 的“Intro to Apache Kafka with Spring”教程。
我用 kafkaConsumerFactory
方法设置了 KafkaConsumerConfig
class:
private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
...
return new DefaultKafkaConsumerFactory<>(props);
}
和两个“自定义”工厂:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
在 MessageListener
class 中,我改为使用 @KafkaListener
注释来注册具有给定 groupId
的消费者以收听主题:
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + message);
...
}
@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
System.out.println("Received Message in group 'bar': " + message);
...
}
这样就有两组消费者,一组有groupId“foo”,一组有groupId“bar”。
现在,如果我以这种方式将“foo”消费者的容器工厂从 fooKafkaListenerContainerFactory
更改为 barKafkaListenerContainerFactory
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
...
}
KafkaListener
的 groupId
和 container factory 的 groupId
似乎不兼容,但没有任何变化。
所以,我想了解的是 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
属性 的作用以及为什么它似乎没有被考虑。
工厂 groupId
是默认值,仅当 @KafkaListener
上没有 groupId
(或 id
)时才使用。
在早期版本中,只能在工厂上设置 groupId,这意味着如果需要不同的组,则需要为每个侦听器创建一个单独的工厂,这违背了可用于多个工厂的想法听众。
查看 javadoc...
/**
* Override the {@code group.id} property for the consumer factory with this value
* for this listener only.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the group id.
* @since 1.3
*/
String groupId() default "";
/**
* When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
* provided) as the {@code group.id} property for the consumer. Set to false, to use
* the {@code group.id} from the consumer factory.
* @return false to disable.
* @since 1.3
*/
boolean idIsGroup() default true;
我遵循了 baeldung.com 的“Intro to Apache Kafka with Spring”教程。
我用 kafkaConsumerFactory
方法设置了 KafkaConsumerConfig
class:
private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
...
return new DefaultKafkaConsumerFactory<>(props);
}
和两个“自定义”工厂:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
在 MessageListener
class 中,我改为使用 @KafkaListener
注释来注册具有给定 groupId
的消费者以收听主题:
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + message);
...
}
@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
System.out.println("Received Message in group 'bar': " + message);
...
}
这样就有两组消费者,一组有groupId“foo”,一组有groupId“bar”。
现在,如果我以这种方式将“foo”消费者的容器工厂从 fooKafkaListenerContainerFactory
更改为 barKafkaListenerContainerFactory
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
...
}
KafkaListener
的 groupId
和 container factory 的 groupId
似乎不兼容,但没有任何变化。
所以,我想了解的是 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
属性 的作用以及为什么它似乎没有被考虑。
工厂 groupId
是默认值,仅当 @KafkaListener
上没有 groupId
(或 id
)时才使用。
在早期版本中,只能在工厂上设置 groupId,这意味着如果需要不同的组,则需要为每个侦听器创建一个单独的工厂,这违背了可用于多个工厂的想法听众。
查看 javadoc...
/**
* Override the {@code group.id} property for the consumer factory with this value
* for this listener only.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the group id.
* @since 1.3
*/
String groupId() default "";
/**
* When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
* provided) as the {@code group.id} property for the consumer. Set to false, to use
* the {@code group.id} from the consumer factory.
* @return false to disable.
* @since 1.3
*/
boolean idIsGroup() default true;