在 spring-cloud-stream 中启动消息的简单方法是什么

What's a simple way to initiate messages in spring-cloud-stream

我刚刚加入了一个使用 Spring Cloud Stream 作为 Kafka 包装器的项目。我们的想法是我们将消息传递 API 抽象化,这样我们就可以自由切换消息传递平台。我发现 API 令人费解,尤其是最近一轮的 deprecations that redirect me from the annotation-based to a functional model based on Spring Cloud Function. I feel like I'm missing something, because the prescribed programming model seems to make the simple act of producing a message quite a pain. Instead of something like kafkaTemplate.sendDefault("Hello"), we have prescribed 怪物,例如:

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }

我该如何用这种 API 编写简单的消息驱动代码?

否;用于按计划发布消息。

https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

就是streamBridge.send("bindingName", "something-to-send").

绑定可以是 application.yml 中的 pre-configured 或在第一次发送时创建。

我认为将要生成的无限数据源与单个 kafkaTemplate.send() 进行比较是不正确的。如果这只是您逻辑中需要的功能,请考虑使用 StreamBridge: https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources.

Supplier<Flux> bean 是框架启动源数据生成逻辑的特殊信号。如果自己动手,就不会那么简单了。我不是说你的 Flux.fromStream(Stream.generate()) 可以用一个 Flux.generate() 代替。也不清楚为什么我要使用特殊的调度程序并共享...

如果你不想做复杂的Flux逻辑,还有一个@PollableBean方法:https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_suppliers_sources