我可以在 Flux.generate 状态生成器中阻止远程调用吗
Can I have blocking remote call in Flux.generate state generator
我正在从一系列阻塞 REST API 调用中生成一个 Flux,每个调用都取决于前一个调用的结果。即
Result r1 = call(fromRow = 0);
Result r2 = call(fromRow = 0 + r1.size());
Result r3 = call(fromRow = 0 + r1.size() + r2.size());
...
这是我正在尝试的简化版本:
Flux.generate(() -> 0, (i, sink) -> {
Result r = slowRemoteCall(i);
if (r == null) {
sink.complete();
} else {
sink.next(r)
}
return i + r.size();
}, state -> {});
只是想知道状态生成器内部的阻塞调用slowRemoteCall
会不会成为问题?
提前感谢您的帮助!
可能会出问题!如果您有任何阻塞调用,您可以使用 schedulers
让特定线程池完成任务。
Flux.generate(.........)
.subscribeOn(Schedulers.boundedElastic())
更多信息:
使用 expand
操作员和反应式远程客户端(例如:Spring WebClient),您可以以反应式 non-blocking 方式实现:
slowRemoteCall(0)
.expand(result -> {
if (result.size() == 0) { // stop condition
return Mono.empty();
} else {
return slowRemoteCall(result.startIndex() + result.size()); // maintain state
}
})
Mono<Result> slowRemoteCall(int startIndex) {
// simulate latency (could be a WebClient call here)
return Mono.delay(Duration.ofMillis(200)).thenReturn(new Result(startIndex));
}
灵感来自 this blog post。
我正在从一系列阻塞 REST API 调用中生成一个 Flux,每个调用都取决于前一个调用的结果。即
Result r1 = call(fromRow = 0);
Result r2 = call(fromRow = 0 + r1.size());
Result r3 = call(fromRow = 0 + r1.size() + r2.size());
...
这是我正在尝试的简化版本:
Flux.generate(() -> 0, (i, sink) -> {
Result r = slowRemoteCall(i);
if (r == null) {
sink.complete();
} else {
sink.next(r)
}
return i + r.size();
}, state -> {});
只是想知道状态生成器内部的阻塞调用slowRemoteCall
会不会成为问题?
提前感谢您的帮助!
可能会出问题!如果您有任何阻塞调用,您可以使用 schedulers
让特定线程池完成任务。
Flux.generate(.........)
.subscribeOn(Schedulers.boundedElastic())
更多信息:
使用 expand
操作员和反应式远程客户端(例如:Spring WebClient),您可以以反应式 non-blocking 方式实现:
slowRemoteCall(0)
.expand(result -> {
if (result.size() == 0) { // stop condition
return Mono.empty();
} else {
return slowRemoteCall(result.startIndex() + result.size()); // maintain state
}
})
Mono<Result> slowRemoteCall(int startIndex) {
// simulate latency (could be a WebClient call here)
return Mono.delay(Duration.ofMillis(200)).thenReturn(new Result(startIndex));
}
灵感来自 this blog post。