在 JAX-RS 支持的服务器中使用反应器时如何处理阻塞调用?
How to handle blocking calls when using reactor in a JAX-RS-powered server?
要处理 HTTP 请求,我们必须将阻塞调用(例如 JDBC 调用)作为基于 Mono/Flux
的流程的一部分。我们目前的计划是这样的:
// I renamed getSomething to processJaxrsHttpRequest
CompletionStage<String> processJaxrsHttpRequest(String input) {
return Mono.just(input)
.map(in -> process(in))
.flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
.flatMap(str -> asyncHttpCall(str))
.flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
.toFuture();
}
其中 fixedScheduler
在 HTTP 请求中同时使用。
我们希望就这种在相当数量的通量内处理块调用的策略获得一些反馈。当然,我们明白,如果我们所有的请求都流经这些阻塞调用,那么我们还不如不使用反应堆(公认的良好处理 API 之外)。
更新:谢谢 bsideup for 。但是,我的问题应该更具体一些。
我的总体问题是如何有效地跨多个通量使用阻塞调用,这些通量可能 created/subscribed 大量。我们尝试了 the suggested approach,但它会导致线程爆炸和快速 OOM。所以我们正在考虑使用共享调度程序。所以..这是我的问题。
- 在我描述的情况下,您会建议使用共享调度程序 (
fixedScheduler
) 吗?如果没有,你能指点我吗?
- 如果使用共享调度程序很好,这会是一个很好的实现吗:
Schedulers.newParallel("blocking-scheduler", maxNumThreads)
?
更新 2:刚刚在 Schedulers#newParallel
上挖了个大坑,发现这行不通,因为它 'rejects' 会阻塞调用。
非常感谢任何提示!
虽然 subscribeOn
确实是处理阻塞调用的一种方法,您的用法没问题,但您也可以使用 publishOn
.
它将处理移动到提供的 Scheduler
,除非指定了其他 publishOn
:
CompletionStage<String> getSomething(String input) {
return Mono.just(input)
.map(in -> process(in)) // process must be non-blocking, or go after publishOn
.publishOn(Schedulers.boundedElastic())
.map(::jdbcCall)
.flatMap(str -> asyncHttpCall(str))
.publishOn(Schedulers.boundedElastic())
.map(::jdbcCall)
.toFuture();
}
如您所见,您也可以继续使用异步调用。只要确保你没有阻止 non-blocking 调度程序(在那个例子中,我在 flatMap
之后再次使用 publishOn
因为 asyncHttpCall
可能从 non-blocking 调度程序完成)
要处理 HTTP 请求,我们必须将阻塞调用(例如 JDBC 调用)作为基于 Mono/Flux
的流程的一部分。我们目前的计划是这样的:
// I renamed getSomething to processJaxrsHttpRequest
CompletionStage<String> processJaxrsHttpRequest(String input) {
return Mono.just(input)
.map(in -> process(in))
.flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
.flatMap(str -> asyncHttpCall(str))
.flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
.toFuture();
}
其中 fixedScheduler
在 HTTP 请求中同时使用。
我们希望就这种在相当数量的通量内处理块调用的策略获得一些反馈。当然,我们明白,如果我们所有的请求都流经这些阻塞调用,那么我们还不如不使用反应堆(公认的良好处理 API 之外)。
更新:谢谢 bsideup for
我的总体问题是如何有效地跨多个通量使用阻塞调用,这些通量可能 created/subscribed 大量。我们尝试了 the suggested approach,但它会导致线程爆炸和快速 OOM。所以我们正在考虑使用共享调度程序。所以..这是我的问题。
- 在我描述的情况下,您会建议使用共享调度程序 (
fixedScheduler
) 吗?如果没有,你能指点我吗? - 如果使用共享调度程序很好,这会是一个很好的实现吗:
Schedulers.newParallel("blocking-scheduler", maxNumThreads)
?
更新 2:刚刚在 Schedulers#newParallel
上挖了个大坑,发现这行不通,因为它 'rejects' 会阻塞调用。
非常感谢任何提示!
虽然 subscribeOn
确实是处理阻塞调用的一种方法,您的用法没问题,但您也可以使用 publishOn
.
它将处理移动到提供的 Scheduler
,除非指定了其他 publishOn
:
CompletionStage<String> getSomething(String input) {
return Mono.just(input)
.map(in -> process(in)) // process must be non-blocking, or go after publishOn
.publishOn(Schedulers.boundedElastic())
.map(::jdbcCall)
.flatMap(str -> asyncHttpCall(str))
.publishOn(Schedulers.boundedElastic())
.map(::jdbcCall)
.toFuture();
}
如您所见,您也可以继续使用异步调用。只要确保你没有阻止 non-blocking 调度程序(在那个例子中,我在 flatMap
之后再次使用 publishOn
因为 asyncHttpCall
可能从 non-blocking 调度程序完成)