@KafkaListener 与 ConsumerFactory groupId

@KafkaListener vs ConsumerFactory groupId

我遵循了 baeldung.com 的“Intro to Apache Kafka with Spring”教程。 我用 kafkaConsumerFactory 方法设置了 KafkaConsumerConfig class:

private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    ...
    return new DefaultKafkaConsumerFactory<>(props);
}

和两个“自定义”工厂:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("foo");
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("bar");
}

MessageListener class 中,我改为使用 @KafkaListener 注释来注册具有给定 groupId 的消费者以收听主题:

@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group 'foo': " + message);
    ...
}

@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
    System.out.println("Received Message in group 'bar': " + message);
    ...
}

这样就有两组消费者,一组有groupId“foo”,一组有groupId“bar”。

现在,如果我以这种方式将“foo”消费者的容器工厂从 fooKafkaListenerContainerFactory 更改为 barKafkaListenerContainerFactory

@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    ...
}

KafkaListenergroupId 和 container factory 的 groupId 似乎不兼容,但没有任何变化。 所以,我想了解的是 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);属性 的作用以及为什么它似乎没有被考虑。

工厂 groupId 是默认值,仅当 @KafkaListener 上没有 groupId(或 id)时才使用。

在早期版本中,只能在工厂上设置 groupId,这意味着如果需要不同的组,则需要为每个侦听器创建一个单独的工厂,这违背了可用于多个工厂的想法听众。

查看 javadoc...

/**
 * Override the {@code group.id} property for the consumer factory with this value
 * for this listener only.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the group id.
 * @since 1.3
 */
String groupId() default "";

/**
 * When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
 * provided) as the {@code group.id} property for the consumer. Set to false, to use
 * the {@code group.id} from the consumer factory.
 * @return false to disable.
 * @since 1.3
 */
boolean idIsGroup() default true;