在 Project Reactor 管道中阻塞 I/O

Blocking I/O in Project Reactor pipeline

以下代码使用 Project Reactor 在有限数量的工作线程中分发阻塞 I/O 操作:

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

...

    List<Item> processItems(List<Item> items) {
        final int parallelDegree = 10;
        final Scheduler scheduler = Schedulers.newParallel("myScheduler", parallelDegree, true);

        return Flux.fromIterable(items)
            .parallel(parallelDegree)
            .runOn(scheduler)
            .map(this::doSomeBlockingIo)
            .sequential()
            .publishOn(Schedulers.immediate())
            .collectList()
            .block();

    ...

    Item doSomeBlockingIo(Item item) {
        // perform some non-deterministic, blocking I/O with side-effects
        ...
        return someNewItem;
    }

代码似乎按原样运行良好。但它是否健壮且惯用?

请注意,我已经检查并在 Project Reactor 文档(包括 JavaDocs)中没有看到任何明确禁止此用例的内容。

求朋友。

它会很好地工作,它很健壮,但是你使用并行调度程序来阻塞 IO 工作的事实并不是最佳的(并且不是特别地道;当有反应堆经验的人看到并行调度程序时,他们期待看到它 运行 非阻塞 IO。)

这里更好的方法是将您的并行调度程序换成 bounded elastic scheduler,您选择的上限(在您的示例中为 10)- 这将启动并根据需要重用支持工作程序,最多你的帽子。