如何将专有流从 spring webflux 转换为 Flux

How to turn a proprietary stream into a Flux from spring webflux

我有一个自定义事件总线,我可以在其中订阅类似

的 lambda
 bus.subscribe(topic, event -> {/*gets executed for every new event*/}, exception -> {})

现在 lambda 显然 运行 在不同的线程中。现在我的问题是如何将这种接口连接到 Flux<Event>?我必须自己写 Publisher 吗?但是人们说这样做不是一个好主意。

模拟实现是

import java.util.function.Consumer

class Mock extends Thread {
    Consumer<String> lambda

    public Mock(Consumer<String> lambda) {
        this.lambda = lambda
    }

    @Override
    void run() {
        while(true) {
            Thread.sleep(1000)
            lambda.accept("lala")
        }
    }
}

Flux<String> flux = new Mock({ /*TODO write to flux*/ }).start()

你是对的,你不应该实现你自己的发布者。在大多数情况下,您也不必处理线程,而是依赖 Flux.

上的静态方法

类似于:

Flux<Event> events = Flux.<Event>create(emitter -> {

     bus.subscribe(topic, event -> emitter.next(event),
         exc -> emitter.error(exc));

     // you should also unsubscribe
     emitter.onDispose(() -> {
         bus.unsubscribe(topic, ...);
     });
 });