Webflux 并行连接以某种方式限制为 256
Webflux parallel connections somehow limited to 256
我有一个简单的服务器和客户端设置:
Flux.range(1, 5000)
.subscribeOn(Schedulers.parallel())
.flatMap(i -> WebClient.create()
.method(HttpMethod.POST)
.uri("http://localhost:8080/test")
.body(Mono.just(String.valueOf(i)), String.class)
.exchange())
.publishOn(Schedulers.parallel())
.subscribe(response ->
response.bodyToMono(String.class)
.publishOn(Schedulers.elastic())
.subscribe(body -> log.info("{}", body)));
这里是客户:
@PostMapping
public Mono<String> test(@RequestBody Mono<String> body) {
return body.delayElement(Duration.ofSeconds(5));
}
这两件事 运行 在 netty 上。也许有人知道是什么导致了这种行为?
这不是由于有关连接池的 WebClient
限制,而是实际上来自您可以更改的 Reactor 实现细节。
默认情况下,flatMap
等 Reactor 运算符具有 prefetch=32
(在最终订阅者请求之前我们请求的元素数)和 maxConcurrency=256
(最大元素数由运营商并发处理)。
您可以使用 Flux.flatMap(Function mapper, int concurrency, int prefetch)
的变体来改变该行为。
您的代码片段混合使用了 subscribeOn
和 publishOn
;我想说的是,鉴于您正在使用此代码片段进行反应性 I/O 工作,您不应该尝试在 elastic/parallel 调度程序上安排工作。最好在此处删除这些运算符。
我有一个简单的服务器和客户端设置:
Flux.range(1, 5000)
.subscribeOn(Schedulers.parallel())
.flatMap(i -> WebClient.create()
.method(HttpMethod.POST)
.uri("http://localhost:8080/test")
.body(Mono.just(String.valueOf(i)), String.class)
.exchange())
.publishOn(Schedulers.parallel())
.subscribe(response ->
response.bodyToMono(String.class)
.publishOn(Schedulers.elastic())
.subscribe(body -> log.info("{}", body)));
这里是客户:
@PostMapping
public Mono<String> test(@RequestBody Mono<String> body) {
return body.delayElement(Duration.ofSeconds(5));
}
这两件事 运行 在 netty 上。也许有人知道是什么导致了这种行为?
这不是由于有关连接池的 WebClient
限制,而是实际上来自您可以更改的 Reactor 实现细节。
默认情况下,flatMap
等 Reactor 运算符具有 prefetch=32
(在最终订阅者请求之前我们请求的元素数)和 maxConcurrency=256
(最大元素数由运营商并发处理)。
您可以使用 Flux.flatMap(Function mapper, int concurrency, int prefetch)
的变体来改变该行为。
您的代码片段混合使用了 subscribeOn
和 publishOn
;我想说的是,鉴于您正在使用此代码片段进行反应性 I/O 工作,您不应该尝试在 elastic/parallel 调度程序上安排工作。最好在此处删除这些运算符。