如何从异步回调创建多个 Flux
How to create multiple Flux from aysnc callbacks
从Reactor的参考指南中了解到Flux.create()
可用于将aysnc回调转换为Flux
。
然而,有时回调有多种方法来接收多种类型的数据,假设我有一段代码如下:
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
// consume state
}
@Override
public void onResultData(Result result) {
// consume result
}
});
如何将其转换为两个反应流:Flux<State>
和 Flux<Result>
?
一种方法是使用某些处理器,例如 DirectProcessor,您可以创建 2 个不同的处理器,并在发生事件时向处理器发送项目并订阅处理器,但如果您仍想使用 Flux.create,您可以这样做
Flux<Object> objectFlux;
@Override
public void run(String... args) throws Exception {
objectFlux = Flux.create(objectFluxSink ->
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
objectFluxSink.next(state);
}
@Override
public void onResultData(Result result) {
objectFluxSink.next(state);
}
}));
}
public Flux<Result> getResult(){
return objectFlux.filter(o -> o instanceof Result)
.map(o -> ((Result)o));
}
public Flux<State> geState(){
return objectFlux.filter(o -> o instanceof State)
.map(o -> ((State)o));
}
我仍然认为使用处理器应该更干净,你不需要进行过滤和转换,但你需要有 2 个处理器
像这样:
DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();
DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
stateDirectProcessor.onNext(state);
}
@Override
public void onResultData(Result result) {
resultDirectProcessor.onNext(result);
}
});
只是可用于给定任务的小片段。
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Sinks.EmitResult result = sink.tryEmitNext("some string");
从Reactor的参考指南中了解到Flux.create()
可用于将aysnc回调转换为Flux
。
然而,有时回调有多种方法来接收多种类型的数据,假设我有一段代码如下:
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
// consume state
}
@Override
public void onResultData(Result result) {
// consume result
}
});
如何将其转换为两个反应流:Flux<State>
和 Flux<Result>
?
一种方法是使用某些处理器,例如 DirectProcessor,您可以创建 2 个不同的处理器,并在发生事件时向处理器发送项目并订阅处理器,但如果您仍想使用 Flux.create,您可以这样做
Flux<Object> objectFlux;
@Override
public void run(String... args) throws Exception {
objectFlux = Flux.create(objectFluxSink ->
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
objectFluxSink.next(state);
}
@Override
public void onResultData(Result result) {
objectFluxSink.next(state);
}
}));
}
public Flux<Result> getResult(){
return objectFlux.filter(o -> o instanceof Result)
.map(o -> ((Result)o));
}
public Flux<State> geState(){
return objectFlux.filter(o -> o instanceof State)
.map(o -> ((State)o));
}
我仍然认为使用处理器应该更干净,你不需要进行过滤和转换,但你需要有 2 个处理器 像这样:
DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();
DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
stateDirectProcessor.onNext(state);
}
@Override
public void onResultData(Result result) {
resultDirectProcessor.onNext(result);
}
});
只是可用于给定任务的小片段。
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Sinks.EmitResult result = sink.tryEmitNext("some string");