使用 reactor 等待多个 http 响应
Wait for multiple http response using reactor
我正在使用 Spring 反应堆进行多次 http 调用并等待所有结果。
这是我的代码:
final Map<String, WSSearchResult> reduced = new HashMap<>();
List<Mono<ClientResponse>> monos = new ArrayList<>();
for (int i=0;i<10;i++) {
log.info("Executing http call {}", i);
WSSearchRequest wsSearchRequest = WSSearchRequest.builder().param(i).build();
Mono<ClientResponse> exchange = webClient.post().uri("/search/availability")
.body(BodyInserters.fromObject(wsSearchRequest)).exchange();
monos.add(exchange);
}
final CountDownLatch latch = new CountDownLatch(monos.size());
Flux.merge(monos).subscribe(clientResponse -> {
List<WSSearchResult> partialResult = clientResponse.bodyToFlux(WSSearchResult.class).collectList().block();
List<WSSearchResult> partial =
partialResult.parallelStream().filter(w-> !Strings.isNullOrEmpty(w.getId())).
collect(Collectors.toList());
mapAndReduce(partial, reduced);
});
try {
latch.await(150, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
问题是上面的代码给出了异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-6
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-6
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.8.RELEASE.jar!/:3.2.8.RELEASE]
at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.8.RELEASE.jar!/:3.2.8.RELEASE]
我是反应堆开发的新手,我不知道为什么它给我这个例外。
有人能告诉我正确的方法吗?
感觉这段代码可以改写成下面的方式:
List<Mono<ClientResponse>> clientResponses = IntStream.range(0, 10)
.mapToObj(i -> WSSearchRequest.builder().param(i).build())
.map(request -> send(request))
.collect(toList());
Mono<Map<String, WSSearchResult>> reduced = Flux.merge(clientResponses)
.flatMap(clientResponse ->
clientResponse.bodyToFlux(WSSearchResult.class)
.filter(result -> !Strings.isNullOrEmpty(result.getId()))
.collectList()
.map(listOfFilteredResults -> map(listOfFilteredResults))
).reduce(new HashMap<>(), (left, right) -> reduce(left, right));
reduced
.timeout(Duration.ofSeconds(150))
.subscribe(result -> handle(result));
private Mono<ClientResponse> send(WSSearchRequest request) {
return webClient.post().uri("/search/availability")
.body(BodyInserters.fromObject(wsSearchRequest))
.exchange();
}
我正在使用 Spring 反应堆进行多次 http 调用并等待所有结果。 这是我的代码:
final Map<String, WSSearchResult> reduced = new HashMap<>();
List<Mono<ClientResponse>> monos = new ArrayList<>();
for (int i=0;i<10;i++) {
log.info("Executing http call {}", i);
WSSearchRequest wsSearchRequest = WSSearchRequest.builder().param(i).build();
Mono<ClientResponse> exchange = webClient.post().uri("/search/availability")
.body(BodyInserters.fromObject(wsSearchRequest)).exchange();
monos.add(exchange);
}
final CountDownLatch latch = new CountDownLatch(monos.size());
Flux.merge(monos).subscribe(clientResponse -> {
List<WSSearchResult> partialResult = clientResponse.bodyToFlux(WSSearchResult.class).collectList().block();
List<WSSearchResult> partial =
partialResult.parallelStream().filter(w-> !Strings.isNullOrEmpty(w.getId())).
collect(Collectors.toList());
mapAndReduce(partial, reduced);
});
try {
latch.await(150, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
问题是上面的代码给出了异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-6
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-6
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.8.RELEASE.jar!/:3.2.8.RELEASE]
at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.8.RELEASE.jar!/:3.2.8.RELEASE]
我是反应堆开发的新手,我不知道为什么它给我这个例外。
有人能告诉我正确的方法吗?
感觉这段代码可以改写成下面的方式:
List<Mono<ClientResponse>> clientResponses = IntStream.range(0, 10)
.mapToObj(i -> WSSearchRequest.builder().param(i).build())
.map(request -> send(request))
.collect(toList());
Mono<Map<String, WSSearchResult>> reduced = Flux.merge(clientResponses)
.flatMap(clientResponse ->
clientResponse.bodyToFlux(WSSearchResult.class)
.filter(result -> !Strings.isNullOrEmpty(result.getId()))
.collectList()
.map(listOfFilteredResults -> map(listOfFilteredResults))
).reduce(new HashMap<>(), (left, right) -> reduce(left, right));
reduced
.timeout(Duration.ofSeconds(150))
.subscribe(result -> handle(result));
private Mono<ClientResponse> send(WSSearchRequest request) {
return webClient.post().uri("/search/availability")
.body(BodyInserters.fromObject(wsSearchRequest))
.exchange();
}