Spring Webflux / Reactor 释放调用线程

Spring Webflux / Reactor release calling thread

所以我明白了在幕后Spring WebFlux 和 Reactor 对 nio 使用 netty,现在我想释放调用线程以释放资源以便处理更多请求。下面的简单代码是否释放了调用线程?

@GetMapping("/foo")
public Mono<Void> bar() {

  someService.veryLongSyncOperation();

  return Mono.empty();
}

我没有将服务调用包装在 Flux/Mono 中,我只是想首先验证调用线程是否在服务执行其长期工作时被释放。这足以实现调用线程释放吗?如果是这样,有没有办法测试这个?

我认为框架看到了 return 类型,这足以让它知道它必须释放调用线程。

没有。在这种情况下,您将在 Netty IO 线程中调用长 运行ning 进程。 我能想到的最简单的方法是 create Mono sink 和 运行 新线程中的 long op(或者可能通过线程池)。当操作成功完成时,您调用 sink.success(),如果失败,您调用 sink.error(x) 传递抛出的异常。

@GetMapping("/foo")
public Mono<Void> bar() {
    return Mono.create(sink -> {
        new Thread(() -> {
            try {
                someService.veryLongSyncOperation();
                sink.success();
            } catch (Exception ex) {
                sink.error(ex);
            }
        }).start();
    });  
}

调用线程returns 设置流程后立即,WebFlux 将订阅返回的Mono 将触发线程到运行 在新线程中。

您可以使用 .subscribeOn(Schedulers.elastic()),因为它在 reactor reference guide

中提到
@GetMapping("/foo")
public Mono<Void> bar() {
    return Mono.fromCallable(() -> someService.veryLongSyncOperation())
            .subscribeOn(Schedulers.elastic())
            .then();
}

each subscription will happen on a dedicated single-threaded worker from Schedulers.elastic().

更新: 现在有 Schedulers.boundedElastic() 调度程序。我建议默认使用它。