Spring 反应式 - 收集一系列分页结果作为所有结果的单声道
Spring Reactive - collect a sequence of paged results as a Mono of all results
在我的 REST 服务中,我必须多次调用另一个 REST 服务才能获取结果列表的所有页面。该请求包含一个 from
字段,我需要在每个请求中增加该字段。响应包含一个 totalResults
字段 - 当我读取所有结果后,我需要停止调用其他服务,收集所有调用的所有结果并生成一个 Mono<List<Result>>
响应。
这是我目前所拥有的:
@Getter
public class Request {
private int from;
private int size = 1000;
private String type;
public Request(String type, int from) {
this.type = type;
this.from = from;
}
}
@Getter
@Setter
public class Response {
private Integer totalResults;
private Integer size;
private Integer from;
private List<Result> results;
}
public Mono<List<Result>> findByType(String type) {
return Flux.generate(
() -> new Request(type, 0),
(Request request, SynchronousSink<List<Result>> sink) -> {
Response response = find(request).block();
sink.next(response.getResults());
int nextFrom = response.getFrom() + response.getSize();
if (nextFrom >= response.getTotalResults()) {
sink.complete();
}
return new Request(type, nextFrom);
})
.flatMap(Flux::fromIterable)
.collectList();
}
private Mono<Response> find(Request request) {
return webClient
.post()
.uri("/search")
.syncBody(request)
.retrieve()
.bodyToMono(Response.class);
}
它在使用 MockWebServer
和 StepVerifier
的测试中有效,但在使用
的生产中失败
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
我怎样才能以正确的反应方式做到这一点?
编辑 在 Adam 的帮助下,expand
函数修复了这个问题
public Mono<List<Result>> findByType(Request request) {
return find(request)
.expand(response -> {
int nextFrom = response.getFrom() + response.getSize();
if (nextFrom >= response.getTotalResults()) {
return Mono.empty();
}
return find(new Request(request.getType(), response.getFrom() + response.getSize()));
})
.flatMap(response -> Flux.fromIterable(response.getResults()))
.collectList();;
}
private Mono<Response> find(Request request) {
return webClient
.post()
.uri("/search")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.syncBody(request)
.retrieve()
.bodyToMono(Response.class);
}
在我的 REST 服务中,我必须多次调用另一个 REST 服务才能获取结果列表的所有页面。该请求包含一个 from
字段,我需要在每个请求中增加该字段。响应包含一个 totalResults
字段 - 当我读取所有结果后,我需要停止调用其他服务,收集所有调用的所有结果并生成一个 Mono<List<Result>>
响应。
这是我目前所拥有的:
@Getter
public class Request {
private int from;
private int size = 1000;
private String type;
public Request(String type, int from) {
this.type = type;
this.from = from;
}
}
@Getter
@Setter
public class Response {
private Integer totalResults;
private Integer size;
private Integer from;
private List<Result> results;
}
public Mono<List<Result>> findByType(String type) {
return Flux.generate(
() -> new Request(type, 0),
(Request request, SynchronousSink<List<Result>> sink) -> {
Response response = find(request).block();
sink.next(response.getResults());
int nextFrom = response.getFrom() + response.getSize();
if (nextFrom >= response.getTotalResults()) {
sink.complete();
}
return new Request(type, nextFrom);
})
.flatMap(Flux::fromIterable)
.collectList();
}
private Mono<Response> find(Request request) {
return webClient
.post()
.uri("/search")
.syncBody(request)
.retrieve()
.bodyToMono(Response.class);
}
它在使用 MockWebServer
和 StepVerifier
的测试中有效,但在使用
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
我怎样才能以正确的反应方式做到这一点?
编辑 在 Adam 的帮助下,expand
函数修复了这个问题
public Mono<List<Result>> findByType(Request request) {
return find(request)
.expand(response -> {
int nextFrom = response.getFrom() + response.getSize();
if (nextFrom >= response.getTotalResults()) {
return Mono.empty();
}
return find(new Request(request.getType(), response.getFrom() + response.getSize()));
})
.flatMap(response -> Flux.fromIterable(response.getResults()))
.collectList();;
}
private Mono<Response> find(Request request) {
return webClient
.post()
.uri("/search")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.syncBody(request)
.retrieve()
.bodyToMono(Response.class);
}