Spring 带有动态@KafkaListener 的 Kafka
Spring Kafka with Dynamic @KafkaListener
我正在使用 Spring Boot 2.x 和 spring-kafka(不是 spring -集成-kafka)
我有多个用 @KafkaListener
注释的 bean ...每个都从一个主题消费...所以因为我有 12 个主题,所以我还需要有 12 个 KafkaConsumers bean ...我想知道我是否可以以编程方式/动态地创建这些 bean ...也许使用 KafkaListenerEndpointRegistry 来动态创建消费者容器。
注意:我需要批量消费消息...所以也许我可以使用 BatchMessageListener?
当前代码:
@KafkaListener(
id = COUNTRY,
containerFactory = KAFKA_LISTENER_FACTORY_BEAN_NAME,
topics = {TOPIC},
groupId = GROUP_ID,
clientIdPrefix = CLIENT_ID,
errorHandler = VALIDATION_ERROR_HANDLER_BEAN_NAME
)
@Override
public void consume(final List<MessageDTO> messages,
@Header(KafkaHeaders.RECEIVED_TOPIC) final List<String> topics,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) final List<String> messagesKey,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitionIds,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) final List<Long> timestamps,
@Header(KafkaHeaders.OFFSET) final List<Long> offsets) {
(...)
}
每个主题使用者都有自己的实现,具体取决于主题。你们能给我指点 blog/pseudocode/git thread/answer 吗?
创建您的对象并将其注册为一个 bean,通过上述方法中的 Supplier 提供它。 Spring 将 运行 设置一切所需的 bean post 处理器。
如果您的主题有某种模式,您也可以试试这个:
kafka:
bindings:
input.consumer.destination-is-pattern: true
我正在使用 Spring Boot 2.x 和 spring-kafka(不是 spring -集成-kafka)
我有多个用 @KafkaListener
注释的 bean ...每个都从一个主题消费...所以因为我有 12 个主题,所以我还需要有 12 个 KafkaConsumers bean ...我想知道我是否可以以编程方式/动态地创建这些 bean ...也许使用 KafkaListenerEndpointRegistry 来动态创建消费者容器。
注意:我需要批量消费消息...所以也许我可以使用 BatchMessageListener?
当前代码:
@KafkaListener(
id = COUNTRY,
containerFactory = KAFKA_LISTENER_FACTORY_BEAN_NAME,
topics = {TOPIC},
groupId = GROUP_ID,
clientIdPrefix = CLIENT_ID,
errorHandler = VALIDATION_ERROR_HANDLER_BEAN_NAME
)
@Override
public void consume(final List<MessageDTO> messages,
@Header(KafkaHeaders.RECEIVED_TOPIC) final List<String> topics,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) final List<String> messagesKey,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitionIds,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) final List<Long> timestamps,
@Header(KafkaHeaders.OFFSET) final List<Long> offsets) {
(...)
}
每个主题使用者都有自己的实现,具体取决于主题。你们能给我指点 blog/pseudocode/git thread/answer 吗?
创建您的对象并将其注册为一个 bean,通过上述方法中的 Supplier 提供它。 Spring 将 运行 设置一切所需的 bean post 处理器。
如果您的主题有某种模式,您也可以试试这个:
kafka:
bindings:
input.consumer.destination-is-pattern: true