集成Spring Boot和Reactor-Kafka的KafkaReceiver
Integrating Spring Boot and Reactor-Kafka's KafkaReceiver
我正在尝试使用库 reactor-kafka
开发一个 Spring 启动应用程序来响应从 Kafka 主题读取的一些消息。
我有一个构建 KafkaReceiver
的配置 class。
@Configuration
public class MyConfiguration {
@Bean
public KafkaReceiver<String, String> kafkaReceiver() {
Map<String, Object> props = new HashMap<>();
// Options initialisation...
final ReceiverOptions<String, String> receiverOptions =
ReceiverOptions.<String, string>create(props)
.subscription(Collections.singleton(consumer.getTopic()));
return KafkaReceiver.create(receiverOptions);
}
}
嗯……现在呢?使用不那么反应的 spring-kafka
库,我可以用 @KafkaListener
注释一个方法,并且 Spring Boot 会为我创建一个从 Kafka 主题监听的线程。
我应该把 KafkaReceiver
放在哪里?在所有示例中我发现直接使用 main
方法,但这不是 引导方式 。
我正在使用 Spring Boot 2.1.3 和 Reactor-Kafka 1.1.0
提前致谢。
既然你有那个 KafkaReceiver
bean,现在你可以这样做:
@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
return args -> {
kafkaReceiver.receive()
...
.sunbscribe();
};
}
这个 ApplicationRunner
bean 将在 ApplicationContext
准备就绪时被踢出。有关详细信息,请参阅其 JavaDocs。
我正在尝试使用库 reactor-kafka
开发一个 Spring 启动应用程序来响应从 Kafka 主题读取的一些消息。
我有一个构建 KafkaReceiver
的配置 class。
@Configuration
public class MyConfiguration {
@Bean
public KafkaReceiver<String, String> kafkaReceiver() {
Map<String, Object> props = new HashMap<>();
// Options initialisation...
final ReceiverOptions<String, String> receiverOptions =
ReceiverOptions.<String, string>create(props)
.subscription(Collections.singleton(consumer.getTopic()));
return KafkaReceiver.create(receiverOptions);
}
}
嗯……现在呢?使用不那么反应的 spring-kafka
库,我可以用 @KafkaListener
注释一个方法,并且 Spring Boot 会为我创建一个从 Kafka 主题监听的线程。
我应该把 KafkaReceiver
放在哪里?在所有示例中我发现直接使用 main
方法,但这不是 引导方式 。
我正在使用 Spring Boot 2.1.3 和 Reactor-Kafka 1.1.0
提前致谢。
既然你有那个 KafkaReceiver
bean,现在你可以这样做:
@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
return args -> {
kafkaReceiver.receive()
...
.sunbscribe();
};
}
这个 ApplicationRunner
bean 将在 ApplicationContext
准备就绪时被踢出。有关详细信息,请参阅其 JavaDocs。