在 spring-cloud-stream 中启动消息的简单方法是什么
What's a simple way to initiate messages in spring-cloud-stream
我刚刚加入了一个使用 Spring Cloud Stream 作为 Kafka 包装器的项目。我们的想法是我们将消息传递 API 抽象化,这样我们就可以自由切换消息传递平台。我发现 API 令人费解,尤其是最近一轮的 deprecations that redirect me from the annotation-based to a functional model based on Spring Cloud Function. I feel like I'm missing something, because the prescribed programming model seems to make the simple act of producing a message quite a pain. Instead of something like kafkaTemplate.sendDefault("Hello")
, we have prescribed 怪物,例如:
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
我该如何用这种 API 编写简单的消息驱动代码?
否;用于按计划发布消息。
就是streamBridge.send("bindingName", "something-to-send")
.
绑定可以是 application.yml 中的 pre-configured 或在第一次发送时创建。
我认为将要生成的无限数据源与单个 kafkaTemplate.send()
进行比较是不正确的。如果这只是您逻辑中需要的功能,请考虑使用 StreamBridge
: https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources.
Supplier<Flux>
bean 是框架启动源数据生成逻辑的特殊信号。如果自己动手,就不会那么简单了。我不是说你的 Flux.fromStream(Stream.generate())
可以用一个 Flux.generate()
代替。也不清楚为什么我要使用特殊的调度程序并共享...
如果你不想做复杂的Flux
逻辑,还有一个@PollableBean
方法:https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_suppliers_sources
我刚刚加入了一个使用 Spring Cloud Stream 作为 Kafka 包装器的项目。我们的想法是我们将消息传递 API 抽象化,这样我们就可以自由切换消息传递平台。我发现 API 令人费解,尤其是最近一轮的 deprecations that redirect me from the annotation-based to a functional model based on Spring Cloud Function. I feel like I'm missing something, because the prescribed programming model seems to make the simple act of producing a message quite a pain. Instead of something like kafkaTemplate.sendDefault("Hello")
, we have prescribed 怪物,例如:
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
我该如何用这种 API 编写简单的消息驱动代码?
否;用于按计划发布消息。
就是streamBridge.send("bindingName", "something-to-send")
.
绑定可以是 application.yml 中的 pre-configured 或在第一次发送时创建。
我认为将要生成的无限数据源与单个 kafkaTemplate.send()
进行比较是不正确的。如果这只是您逻辑中需要的功能,请考虑使用 StreamBridge
: https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources.
Supplier<Flux>
bean 是框架启动源数据生成逻辑的特殊信号。如果自己动手,就不会那么简单了。我不是说你的 Flux.fromStream(Stream.generate())
可以用一个 Flux.generate()
代替。也不清楚为什么我要使用特殊的调度程序并共享...
如果你不想做复杂的Flux
逻辑,还有一个@PollableBean
方法:https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_suppliers_sources