Spring 带有动态@KafkaListener 的 Kafka

Spring Kafka with Dynamic @KafkaListener

我正在使用 Spring Boot 2.x 和 spring-kafka(不是 spring -集成-kafka)

我有多个用 @KafkaListener 注释的 bean ...每个都从一个主题消费...所以因为我有 12 个主题,所以我还需要有 12 个 KafkaConsumers bean ...我想知道我是否可以以编程方式/动态地创建这些 bean ...也许使用 KafkaListenerEndpointRegistry 来动态创建消费者容器。

注意:我需要批量消费消息...所以也许我可以使用 BatchMessageListener?

当前代码:

@KafkaListener(
        id = COUNTRY,
        containerFactory = KAFKA_LISTENER_FACTORY_BEAN_NAME,
        topics = {TOPIC},
        groupId = GROUP_ID,
        clientIdPrefix = CLIENT_ID,
        errorHandler = VALIDATION_ERROR_HANDLER_BEAN_NAME
    )
    @Override
    public void consume(final List<MessageDTO> messages,
        @Header(KafkaHeaders.RECEIVED_TOPIC) final List<String> topics,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) final List<String> messagesKey,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitionIds,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final List<Long> timestamps,
        @Header(KafkaHeaders.OFFSET) final List<Long> offsets) {
            (...)
    }

每个主题使用者都有自己的实现,具体取决于主题。你们能给我指点 blog/pseudocode/git thread/answer 吗?

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/support/GenericApplicationContext.html#registerBean-java.lang.Class-java.util.function.Supplier-org.springframework.beans.factory.config.BeanDefinitionCustomizer...-

创建您的对象并将其注册为一个 bean,通过上述方法中的 Supplier 提供它。 Spring 将 运行 设置一切所需的 bean post 处理器。

如果您的主题有某种模式,您也可以试试这个:

      kafka:
        bindings:
            input.consumer.destination-is-pattern: true