如何在启动上下文之前在内存中加载压缩主题

How to load a compacted topic in memory before starting the context

我在 kafka 中使用了一个紧凑的主题,我在应用程序启动时将其加载到 HashMap 中。 然后我正在听消息的普通主题,并使用从压缩主题构造的 HashMap 处理它们。

在开始收听其他主题之前,如何确保已完整阅读压缩主题并完全初始化 HashMap? (与 RestControllers 相同)

实施 SmartLifecycle 并加载 start() 中的地图。确保 phase 早于任何其他需要映射的对象。

这是一个老问题,我知道,但我想提供一个更完整的解决方案代码示例,当我自己也在努力解决这个问题时,我最终得到了这个解决方案。

想法是,就像 Gary 在他自己的回答的评论中提到的那样,监听器不是在初始化期间使用的正确对象 - 之后才出现。然而,Garry 的 SmartLifecycle 想法的替代方案是 InitializingBean,我发现它实施起来不那么复杂,因为它只有一种方法:afterPropertiesSet():

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MyCacheInitializer implements InitializingBean {

    private final ApplicationProperties applicationProperties; // A custom ConfigurationProperties-class
    private final KafkaProperties kafkaProperties;
    private final ConsumerFactory<String, Bytes> consumerFactory;
    private final MyKafkaMessageProcessor messageProcessor;

    @Override
    public void afterPropertiesSet() {
        String topicName = applicationProperties.getKafka().getConsumer().get("my-consumer").getTopic();
        Duration pollTimeout = kafkaProperties.getListener().getPollTimeout();

        try (Consumer<String, Bytes> consumer = consumerFactory.createConsumer()) {
            consumer.subscribe(List.of(topicName));

            log.info("Starting to cache the contents of {}", topicName);

            ConsumerRecords<String, Bytes> records;

            do {
                records = consumer.poll(pollTimeout);
                records.forEach(messageProcessor::process);
            } while (!records.isEmpty());
        }

        log.info("Completed caching {}", topicName);
    }
}

为了简洁起见,我使用了 Lombok 的 @Slf4j@RequiredArgsConstructor 注释,但这些注释很容易被替换。 ApplicationProperties class 只是我获取我感兴趣的主题名称的方式。它可以用其他东西替换,但我的实现使用 Lombok 的 @Data 注释,看起来像这个:

@Data
@Configuration
@ConfigurationProperties(prefix = "app")
public class ApplicationProperties {

    private Kafka kafka = new Kafka();

    @Data
    public static class Kafka {
        private Map<String, KafkaConsumer> consumer = new HashMap<>();
    }

    @Data
    public static class KafkaConsumer {
        private String topic;
    }
}