在 Project Reactor 管道中阻塞 I/O
Blocking I/O in Project Reactor pipeline
以下代码使用 Project Reactor 在有限数量的工作线程中分发阻塞 I/O 操作:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
...
List<Item> processItems(List<Item> items) {
final int parallelDegree = 10;
final Scheduler scheduler = Schedulers.newParallel("myScheduler", parallelDegree, true);
return Flux.fromIterable(items)
.parallel(parallelDegree)
.runOn(scheduler)
.map(this::doSomeBlockingIo)
.sequential()
.publishOn(Schedulers.immediate())
.collectList()
.block();
...
Item doSomeBlockingIo(Item item) {
// perform some non-deterministic, blocking I/O with side-effects
...
return someNewItem;
}
代码似乎按原样运行良好。但它是否健壮且惯用?
请注意,我已经检查并在 Project Reactor 文档(包括 JavaDocs)中没有看到任何明确禁止此用例的内容。
求朋友。
它会很好地工作,它很健壮,但是你使用并行调度程序来阻塞 IO 工作的事实并不是最佳的(并且不是特别地道;当有反应堆经验的人看到并行调度程序时,他们期待看到它 运行 非阻塞 IO。)
这里更好的方法是将您的并行调度程序换成 bounded elastic scheduler,您选择的上限(在您的示例中为 10)- 这将启动并根据需要重用支持工作程序,最多你的帽子。
以下代码使用 Project Reactor 在有限数量的工作线程中分发阻塞 I/O 操作:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
...
List<Item> processItems(List<Item> items) {
final int parallelDegree = 10;
final Scheduler scheduler = Schedulers.newParallel("myScheduler", parallelDegree, true);
return Flux.fromIterable(items)
.parallel(parallelDegree)
.runOn(scheduler)
.map(this::doSomeBlockingIo)
.sequential()
.publishOn(Schedulers.immediate())
.collectList()
.block();
...
Item doSomeBlockingIo(Item item) {
// perform some non-deterministic, blocking I/O with side-effects
...
return someNewItem;
}
代码似乎按原样运行良好。但它是否健壮且惯用?
请注意,我已经检查并在 Project Reactor 文档(包括 JavaDocs)中没有看到任何明确禁止此用例的内容。
求朋友。
它会很好地工作,它很健壮,但是你使用并行调度程序来阻塞 IO 工作的事实并不是最佳的(并且不是特别地道;当有反应堆经验的人看到并行调度程序时,他们期待看到它 运行 非阻塞 IO。)
这里更好的方法是将您的并行调度程序换成 bounded elastic scheduler,您选择的上限(在您的示例中为 10)- 这将启动并根据需要重用支持工作程序,最多你的帽子。