Spring Webflux / Reactor 释放调用线程
Spring Webflux / Reactor release calling thread
所以我明白了在幕后Spring WebFlux 和 Reactor 对 nio 使用 netty,现在我想释放调用线程以释放资源以便处理更多请求。下面的简单代码是否释放了调用线程?
@GetMapping("/foo")
public Mono<Void> bar() {
someService.veryLongSyncOperation();
return Mono.empty();
}
我没有将服务调用包装在 Flux/Mono 中,我只是想首先验证调用线程是否在服务执行其长期工作时被释放。这足以实现调用线程释放吗?如果是这样,有没有办法测试这个?
我认为框架看到了 return 类型,这足以让它知道它必须释放调用线程。
没有。在这种情况下,您将在 Netty IO 线程中调用长 运行ning 进程。
我能想到的最简单的方法是 create Mono sink 和 运行 新线程中的 long op(或者可能通过线程池)。当操作成功完成时,您调用 sink.success()
,如果失败,您调用 sink.error(x)
传递抛出的异常。
@GetMapping("/foo")
public Mono<Void> bar() {
return Mono.create(sink -> {
new Thread(() -> {
try {
someService.veryLongSyncOperation();
sink.success();
} catch (Exception ex) {
sink.error(ex);
}
}).start();
});
}
调用线程returns 设置流程后立即,WebFlux 将订阅返回的Mono 将触发线程到运行 在新线程中。
您可以使用 .subscribeOn(Schedulers.elastic())
,因为它在 reactor reference guide
中提到
@GetMapping("/foo")
public Mono<Void> bar() {
return Mono.fromCallable(() -> someService.veryLongSyncOperation())
.subscribeOn(Schedulers.elastic())
.then();
}
each subscription will happen on a dedicated single-threaded worker from Schedulers.elastic().
更新: 现在有 Schedulers.boundedElastic()
调度程序。我建议默认使用它。
所以我明白了在幕后Spring WebFlux 和 Reactor 对 nio 使用 netty,现在我想释放调用线程以释放资源以便处理更多请求。下面的简单代码是否释放了调用线程?
@GetMapping("/foo")
public Mono<Void> bar() {
someService.veryLongSyncOperation();
return Mono.empty();
}
我没有将服务调用包装在 Flux/Mono 中,我只是想首先验证调用线程是否在服务执行其长期工作时被释放。这足以实现调用线程释放吗?如果是这样,有没有办法测试这个?
我认为框架看到了 return 类型,这足以让它知道它必须释放调用线程。
没有。在这种情况下,您将在 Netty IO 线程中调用长 运行ning 进程。
我能想到的最简单的方法是 create Mono sink 和 运行 新线程中的 long op(或者可能通过线程池)。当操作成功完成时,您调用 sink.success()
,如果失败,您调用 sink.error(x)
传递抛出的异常。
@GetMapping("/foo")
public Mono<Void> bar() {
return Mono.create(sink -> {
new Thread(() -> {
try {
someService.veryLongSyncOperation();
sink.success();
} catch (Exception ex) {
sink.error(ex);
}
}).start();
});
}
调用线程returns 设置流程后立即,WebFlux 将订阅返回的Mono 将触发线程到运行 在新线程中。
您可以使用 .subscribeOn(Schedulers.elastic())
,因为它在 reactor reference guide
@GetMapping("/foo")
public Mono<Void> bar() {
return Mono.fromCallable(() -> someService.veryLongSyncOperation())
.subscribeOn(Schedulers.elastic())
.then();
}
each subscription will happen on a dedicated single-threaded worker from Schedulers.elastic().
更新: 现在有 Schedulers.boundedElastic()
调度程序。我建议默认使用它。