如何将专有流从 spring webflux 转换为 Flux
How to turn a proprietary stream into a Flux from spring webflux
我有一个自定义事件总线,我可以在其中订阅类似
的 lambda
bus.subscribe(topic, event -> {/*gets executed for every new event*/}, exception -> {})
现在 lambda 显然 运行 在不同的线程中。现在我的问题是如何将这种接口连接到 Flux<Event>
?我必须自己写 Publisher
吗?但是人们说这样做不是一个好主意。
模拟实现是
import java.util.function.Consumer
class Mock extends Thread {
Consumer<String> lambda
public Mock(Consumer<String> lambda) {
this.lambda = lambda
}
@Override
void run() {
while(true) {
Thread.sleep(1000)
lambda.accept("lala")
}
}
}
Flux<String> flux = new Mock({ /*TODO write to flux*/ }).start()
你是对的,你不应该实现你自己的发布者。在大多数情况下,您也不必处理线程,而是依赖 Flux
.
上的静态方法
类似于:
Flux<Event> events = Flux.<Event>create(emitter -> {
bus.subscribe(topic, event -> emitter.next(event),
exc -> emitter.error(exc));
// you should also unsubscribe
emitter.onDispose(() -> {
bus.unsubscribe(topic, ...);
});
});
我有一个自定义事件总线,我可以在其中订阅类似
的 lambda bus.subscribe(topic, event -> {/*gets executed for every new event*/}, exception -> {})
现在 lambda 显然 运行 在不同的线程中。现在我的问题是如何将这种接口连接到 Flux<Event>
?我必须自己写 Publisher
吗?但是人们说这样做不是一个好主意。
模拟实现是
import java.util.function.Consumer
class Mock extends Thread {
Consumer<String> lambda
public Mock(Consumer<String> lambda) {
this.lambda = lambda
}
@Override
void run() {
while(true) {
Thread.sleep(1000)
lambda.accept("lala")
}
}
}
Flux<String> flux = new Mock({ /*TODO write to flux*/ }).start()
你是对的,你不应该实现你自己的发布者。在大多数情况下,您也不必处理线程,而是依赖 Flux
.
类似于:
Flux<Event> events = Flux.<Event>create(emitter -> {
bus.subscribe(topic, event -> emitter.next(event),
exc -> emitter.error(exc));
// you should also unsubscribe
emitter.onDispose(() -> {
bus.unsubscribe(topic, ...);
});
});