如何在启动上下文之前在内存中加载压缩主题
How to load a compacted topic in memory before starting the context
我在 kafka 中使用了一个紧凑的主题,我在应用程序启动时将其加载到 HashMap 中。
然后我正在听消息的普通主题,并使用从压缩主题构造的 HashMap 处理它们。
在开始收听其他主题之前,如何确保已完整阅读压缩主题并完全初始化 HashMap?
(与 RestControllers 相同)
实施 SmartLifecycle
并加载 start()
中的地图。确保 phase
早于任何其他需要映射的对象。
这是一个老问题,我知道,但我想提供一个更完整的解决方案代码示例,当我自己也在努力解决这个问题时,我最终得到了这个解决方案。
想法是,就像 Gary 在他自己的回答的评论中提到的那样,监听器不是在初始化期间使用的正确对象 - 之后才出现。然而,Garry 的 SmartLifecycle
想法的替代方案是 InitializingBean
,我发现它实施起来不那么复杂,因为它只有一种方法:afterPropertiesSet()
:
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MyCacheInitializer implements InitializingBean {
private final ApplicationProperties applicationProperties; // A custom ConfigurationProperties-class
private final KafkaProperties kafkaProperties;
private final ConsumerFactory<String, Bytes> consumerFactory;
private final MyKafkaMessageProcessor messageProcessor;
@Override
public void afterPropertiesSet() {
String topicName = applicationProperties.getKafka().getConsumer().get("my-consumer").getTopic();
Duration pollTimeout = kafkaProperties.getListener().getPollTimeout();
try (Consumer<String, Bytes> consumer = consumerFactory.createConsumer()) {
consumer.subscribe(List.of(topicName));
log.info("Starting to cache the contents of {}", topicName);
ConsumerRecords<String, Bytes> records;
do {
records = consumer.poll(pollTimeout);
records.forEach(messageProcessor::process);
} while (!records.isEmpty());
}
log.info("Completed caching {}", topicName);
}
}
为了简洁起见,我使用了 Lombok 的 @Slf4j
和 @RequiredArgsConstructor
注释,但这些注释很容易被替换。 ApplicationProperties
class 只是我获取我感兴趣的主题名称的方式。它可以用其他东西替换,但我的实现使用 Lombok 的 @Data
注释,看起来像这个:
@Data
@Configuration
@ConfigurationProperties(prefix = "app")
public class ApplicationProperties {
private Kafka kafka = new Kafka();
@Data
public static class Kafka {
private Map<String, KafkaConsumer> consumer = new HashMap<>();
}
@Data
public static class KafkaConsumer {
private String topic;
}
}
我在 kafka 中使用了一个紧凑的主题,我在应用程序启动时将其加载到 HashMap 中。 然后我正在听消息的普通主题,并使用从压缩主题构造的 HashMap 处理它们。
在开始收听其他主题之前,如何确保已完整阅读压缩主题并完全初始化 HashMap? (与 RestControllers 相同)
实施 SmartLifecycle
并加载 start()
中的地图。确保 phase
早于任何其他需要映射的对象。
这是一个老问题,我知道,但我想提供一个更完整的解决方案代码示例,当我自己也在努力解决这个问题时,我最终得到了这个解决方案。
想法是,就像 Gary 在他自己的回答的评论中提到的那样,监听器不是在初始化期间使用的正确对象 - 之后才出现。然而,Garry 的 SmartLifecycle
想法的替代方案是 InitializingBean
,我发现它实施起来不那么复杂,因为它只有一种方法:afterPropertiesSet()
:
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MyCacheInitializer implements InitializingBean {
private final ApplicationProperties applicationProperties; // A custom ConfigurationProperties-class
private final KafkaProperties kafkaProperties;
private final ConsumerFactory<String, Bytes> consumerFactory;
private final MyKafkaMessageProcessor messageProcessor;
@Override
public void afterPropertiesSet() {
String topicName = applicationProperties.getKafka().getConsumer().get("my-consumer").getTopic();
Duration pollTimeout = kafkaProperties.getListener().getPollTimeout();
try (Consumer<String, Bytes> consumer = consumerFactory.createConsumer()) {
consumer.subscribe(List.of(topicName));
log.info("Starting to cache the contents of {}", topicName);
ConsumerRecords<String, Bytes> records;
do {
records = consumer.poll(pollTimeout);
records.forEach(messageProcessor::process);
} while (!records.isEmpty());
}
log.info("Completed caching {}", topicName);
}
}
为了简洁起见,我使用了 Lombok 的 @Slf4j
和 @RequiredArgsConstructor
注释,但这些注释很容易被替换。 ApplicationProperties
class 只是我获取我感兴趣的主题名称的方式。它可以用其他东西替换,但我的实现使用 Lombok 的 @Data
注释,看起来像这个:
@Data
@Configuration
@ConfigurationProperties(prefix = "app")
public class ApplicationProperties {
private Kafka kafka = new Kafka();
@Data
public static class Kafka {
private Map<String, KafkaConsumer> consumer = new HashMap<>();
}
@Data
public static class KafkaConsumer {
private String topic;
}
}