如何控制 Flux.flatMap (Mono) 的并行度?
How to control parallelism of Flux.flatMap (Mono)?
下面的代码并行执行所有网络请求 (webClient),不遵守我在 parallel(5)
.
中设置的限制
Flux.fromIterable(dataListWithHundredsElements)
.parallel(5).runOn(Schedulers.boundedElastic())
.flatMap(element ->
webClient.post().
.bodyValue(element)
.retrieve()
.bodyToMono(String.class)
.doOnError(err -> element.setError(Utils.toString(err)))
.doOnSuccess(r -> element.setResponse(r))
)
.sequential()
.onErrorContinue((e, v) -> {})
.doOnComplete(() -> updateInDatabase(dataListWithHundresdElements))
.subscribe();
我想知道是否可以根据 parallel(5)
中指定的值执行请求,以及如何最好地做到这一点?
一个细节,这段代码是一个 Spring MVC 应用程序,我正在请求外部服务。
更新 01
事实上 Flux 创建了 5 个线程,但是,所有请求 (WebClient Mono) 都是同时执行的。
我想要一次执行 5 个请求,所以当 1 个请求结束时,另一个请求开始,但任何时候并行的请求都不应超过 5 个。
由于 Mono 也是一种反应类型,在我看来,Flux 的 5 个线程调用它并且没有被阻塞,实际上发生的情况是所有请求都是并行发生的。
更新 02 - 外部服务日志
这是外部服务的日志,大约需要 5 秒才能响应。正如您在下面的日志中看到的,同时有 14 个请求。
2020-05-08 11:53:56.655 INFO 28223 --- [nio-8080-exec-8] EXTERNAL SERVICE LOG {"id": 21} http-nio-8080-exec-8
2020-05-08 11:53:56.655 INFO 28223 --- [nio-8080-exec-7] EXTERNAL SERVICE LOG {"id": 20} http-nio-8080-exec-7
2020-05-08 11:53:56.659 INFO 28223 --- [nio-8080-exec-2] EXTERNAL SERVICE LOG {"id": 27} http-nio-8080-exec-2
2020-05-08 11:53:56.659 INFO 28223 --- [nio-8080-exec-6] EXTERNAL SERVICE LOG {"id": 19} http-nio-8080-exec-6
2020-05-08 11:53:56.659 INFO 28223 --- [io-8080-exec-10] EXTERNAL SERVICE LOG {"id": 23} http-nio-8080-exec-10
2020-05-08 11:53:56.660 INFO 28223 --- [nio-8080-exec-5] EXTERNAL SERVICE LOG {"id": 18} http-nio-8080-exec-5
2020-05-08 11:53:56.660 INFO 28223 --- [nio-8080-exec-9] EXTERNAL SERVICE LOG {"id": 17} http-nio-8080-exec-9
2020-05-08 11:53:56.660 INFO 28223 --- [nio-8080-exec-1] EXTERNAL SERVICE LOG {"id": 29} http-nio-8080-exec-1
2020-05-08 11:53:56.661 INFO 28223 --- [nio-8080-exec-4] EXTERNAL SERVICE LOG {"id": 24} http-nio-8080-exec-4
2020-05-08 11:53:56.666 INFO 28223 --- [io-8080-exec-11] EXTERNAL SERVICE LOG {"id": 25} http-nio-8080-exec-11
2020-05-08 11:53:56.675 INFO 28223 --- [io-8080-exec-13] EXTERNAL SERVICE LOG {"id": 42} http-nio-8080-exec-13
2020-05-08 11:53:56.678 INFO 28223 --- [io-8080-exec-14] EXTERNAL SERVICE LOG {"id": 28} http-nio-8080-exec-14
2020-05-08 11:53:56.680 INFO 28223 --- [io-8080-exec-12] EXTERNAL SERVICE LOG {"id": 26} http-nio-8080-exec-12
2020-05-08 11:53:56.686 INFO 28223 --- [io-8080-exec-15] EXTERNAL SERVICE LOG {"id": 22} http-nio-8080-exec-15
更新 03 - 反应器日志
正在加固,外部服务大约需要5秒响应。然而,可以看到所有请求 (14) 几乎是同时发出的。
2020-05-08 11:53:56.051 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.053 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.081 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.081 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.082 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.082 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.093 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.093 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.094 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.095 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.110 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onNext(@40ddcd53)
2020-05-08 11:53:56.112 INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1 : onNext(@200e0819)
2020-05-08 11:53:56.112 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onNext(@3b81eee2)
2020-05-08 11:53:56.113 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onNext(@60af2a4d)
2020-05-08 11:53:56.115 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onNext(@723db553)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onNext(@387743b5)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onNext(@62ed2f8d)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1 : onNext(@1a40554a)
2020-05-08 11:53:56.442 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onNext(@1bcb696a)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onNext(@46c98823)
2020-05-08 11:53:56.443 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.446 INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.442 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onNext(@1c0da4a)
2020-05-08 11:53:56.448 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.452 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onNext(@14d54d26)
2020-05-08 11:53:56.453 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.490 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onNext(@46e43af)
2020-05-08 11:53:56.492 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onNext(@5ca02355)
2020-05-08 11:53:56.496 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onComplete()
您可以使用ParallelFlux#flatMap(Function<? super T,? extends Publisher<? extends R>>, boolean, int)
方法来控制并发。
对于您的情况,它可能是:
.flatMap(element ->
webClient.post().
.bodyValue(element)
.retrieve()
.bodyToMono(String.class)
.doOnError(err -> element.setError(Utils.toString(err)))
.doOnSuccess(r -> element.setResponse(r)),
false, 1
)
但是,实际上,您不必创建 ParallelFlux
。只需使用 Flux#flatMap(Function<? super T,? extends Publisher<? extends V>>, int)
方法:
Flux.fromIterable(dataListWithHundredsElements)
.flatMap(element -> webclient.post()..., 5)
...
flatMap
方法的第二个参数负责并发。
下面的代码并行执行所有网络请求 (webClient),不遵守我在 parallel(5)
.
Flux.fromIterable(dataListWithHundredsElements)
.parallel(5).runOn(Schedulers.boundedElastic())
.flatMap(element ->
webClient.post().
.bodyValue(element)
.retrieve()
.bodyToMono(String.class)
.doOnError(err -> element.setError(Utils.toString(err)))
.doOnSuccess(r -> element.setResponse(r))
)
.sequential()
.onErrorContinue((e, v) -> {})
.doOnComplete(() -> updateInDatabase(dataListWithHundresdElements))
.subscribe();
我想知道是否可以根据 parallel(5)
中指定的值执行请求,以及如何最好地做到这一点?
一个细节,这段代码是一个 Spring MVC 应用程序,我正在请求外部服务。
更新 01
事实上 Flux 创建了 5 个线程,但是,所有请求 (WebClient Mono) 都是同时执行的。
我想要一次执行 5 个请求,所以当 1 个请求结束时,另一个请求开始,但任何时候并行的请求都不应超过 5 个。
由于 Mono 也是一种反应类型,在我看来,Flux 的 5 个线程调用它并且没有被阻塞,实际上发生的情况是所有请求都是并行发生的。
更新 02 - 外部服务日志
这是外部服务的日志,大约需要 5 秒才能响应。正如您在下面的日志中看到的,同时有 14 个请求。
2020-05-08 11:53:56.655 INFO 28223 --- [nio-8080-exec-8] EXTERNAL SERVICE LOG {"id": 21} http-nio-8080-exec-8
2020-05-08 11:53:56.655 INFO 28223 --- [nio-8080-exec-7] EXTERNAL SERVICE LOG {"id": 20} http-nio-8080-exec-7
2020-05-08 11:53:56.659 INFO 28223 --- [nio-8080-exec-2] EXTERNAL SERVICE LOG {"id": 27} http-nio-8080-exec-2
2020-05-08 11:53:56.659 INFO 28223 --- [nio-8080-exec-6] EXTERNAL SERVICE LOG {"id": 19} http-nio-8080-exec-6
2020-05-08 11:53:56.659 INFO 28223 --- [io-8080-exec-10] EXTERNAL SERVICE LOG {"id": 23} http-nio-8080-exec-10
2020-05-08 11:53:56.660 INFO 28223 --- [nio-8080-exec-5] EXTERNAL SERVICE LOG {"id": 18} http-nio-8080-exec-5
2020-05-08 11:53:56.660 INFO 28223 --- [nio-8080-exec-9] EXTERNAL SERVICE LOG {"id": 17} http-nio-8080-exec-9
2020-05-08 11:53:56.660 INFO 28223 --- [nio-8080-exec-1] EXTERNAL SERVICE LOG {"id": 29} http-nio-8080-exec-1
2020-05-08 11:53:56.661 INFO 28223 --- [nio-8080-exec-4] EXTERNAL SERVICE LOG {"id": 24} http-nio-8080-exec-4
2020-05-08 11:53:56.666 INFO 28223 --- [io-8080-exec-11] EXTERNAL SERVICE LOG {"id": 25} http-nio-8080-exec-11
2020-05-08 11:53:56.675 INFO 28223 --- [io-8080-exec-13] EXTERNAL SERVICE LOG {"id": 42} http-nio-8080-exec-13
2020-05-08 11:53:56.678 INFO 28223 --- [io-8080-exec-14] EXTERNAL SERVICE LOG {"id": 28} http-nio-8080-exec-14
2020-05-08 11:53:56.680 INFO 28223 --- [io-8080-exec-12] EXTERNAL SERVICE LOG {"id": 26} http-nio-8080-exec-12
2020-05-08 11:53:56.686 INFO 28223 --- [io-8080-exec-15] EXTERNAL SERVICE LOG {"id": 22} http-nio-8080-exec-15
更新 03 - 反应器日志
正在加固,外部服务大约需要5秒响应。然而,可以看到所有请求 (14) 几乎是同时发出的。
2020-05-08 11:53:56.051 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.053 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.081 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.081 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.082 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.082 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.093 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.093 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.094 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.095 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.110 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onNext(@40ddcd53)
2020-05-08 11:53:56.112 INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1 : onNext(@200e0819)
2020-05-08 11:53:56.112 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onNext(@3b81eee2)
2020-05-08 11:53:56.113 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onNext(@60af2a4d)
2020-05-08 11:53:56.115 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onNext(@723db553)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onNext(@387743b5)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onNext(@62ed2f8d)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1 : onNext(@1a40554a)
2020-05-08 11:53:56.442 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onNext(@1bcb696a)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onNext(@46c98823)
2020-05-08 11:53:56.443 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.446 INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.442 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onNext(@1c0da4a)
2020-05-08 11:53:56.448 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.452 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onNext(@14d54d26)
2020-05-08 11:53:56.453 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.490 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onNext(@46e43af)
2020-05-08 11:53:56.492 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onNext(@5ca02355)
2020-05-08 11:53:56.496 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onComplete()
您可以使用ParallelFlux#flatMap(Function<? super T,? extends Publisher<? extends R>>, boolean, int)
方法来控制并发。
对于您的情况,它可能是:
.flatMap(element ->
webClient.post().
.bodyValue(element)
.retrieve()
.bodyToMono(String.class)
.doOnError(err -> element.setError(Utils.toString(err)))
.doOnSuccess(r -> element.setResponse(r)),
false, 1
)
但是,实际上,您不必创建 ParallelFlux
。只需使用 Flux#flatMap(Function<? super T,? extends Publisher<? extends V>>, int)
方法:
Flux.fromIterable(dataListWithHundredsElements)
.flatMap(element -> webclient.post()..., 5)
...
flatMap
方法的第二个参数负责并发。