Spring 反应式 - 收集一系列分页结果作为所有结果的单声道

Spring Reactive - collect a sequence of paged results as a Mono of all results

在我的 REST 服务中,我必须多次调用另一个 REST 服务才能获取结果列表的所有页面。该请求包含一个 from 字段,我需要在每个请求中增加该字段。响应包含一个 totalResults 字段 - 当我读取所有结果后,我需要停止调用其他服务,收集所有调用的所有结果并生成一个 Mono<List<Result>> 响应。

这是我目前所拥有的:

@Getter
public class Request {
    private int from;
    private int size = 1000;
    private String type;

    public Request(String type, int from) {
        this.type = type;
        this.from = from;
    }
}

@Getter
@Setter
public class Response {
    private Integer totalResults;
    private Integer size;
    private Integer from;
    private List<Result> results;
}

public Mono<List<Result>> findByType(String type) {
    return Flux.generate(
            () -> new Request(type, 0),
            (Request request, SynchronousSink<List<Result>> sink) -> {
                Response response = find(request).block();
                sink.next(response.getResults());
                int nextFrom = response.getFrom() + response.getSize();
                if (nextFrom >= response.getTotalResults()) {
                    sink.complete();
                }
                return new Request(type, nextFrom);
            })
            .flatMap(Flux::fromIterable)
            .collectList();
}

private Mono<Response> find(Request request) {
    return webClient
            .post()
            .uri("/search")
            .syncBody(request)
            .retrieve()
            .bodyToMono(Response.class);
}

它在使用 MockWebServerStepVerifier 的测试中有效,但在使用

的生产中失败
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

我怎样才能以正确的反应方式做到这一点?

编辑 在 Adam 的帮助下,expand 函数修复了这个问题

public Mono<List<Result>> findByType(Request request) {
        return find(request)
                .expand(response -> {
                    int nextFrom = response.getFrom() + response.getSize();
                    if (nextFrom >= response.getTotalResults()) {
                        return Mono.empty();
                    }
                    return find(new Request(request.getType(), response.getFrom() + response.getSize()));
                })
                 .flatMap(response -> Flux.fromIterable(response.getResults()))
                 .collectList();;
    }

    private Mono<Response> find(Request request) {
        return webClient
                .post()
                .uri("/search")
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .syncBody(request)
                .retrieve()
                .bodyToMono(Response.class);
    }