Spring WebFlux Web 客户端 - 迭代分页 REST API

Spring WebFlux Web Client - Iterating paged REST API

我需要从可分页 REST API 的所有页面中获取项目。我还需要在项目可用时立即开始处理项目,而不需要等待所有页面加载完毕。为此,我正在使用 Spring WebFlux 及其 WebClient,并希望 return Flux<Item>。 此外,我正在使用的 REST API 是有速率限制的,并且对它的每个响应都包含 headers 以及当前限制的详细信息:

单个页面请求的响应如下所示:

{
    "data": [],
    "meta": {
      "pagination": {
        "total": 10,
        "current": 1
      }
    }
}

数据数组包含实际项目,而元 object 包含分页信息。

我当前的解决方案首先执行一个 "dummy" 请求,只是为了获取总页数和速率限制。

Mono<T> paginated = client.get()
    .uri(uri)
    .exchange()
    .flatMap(response -> {                  
        HttpHeaders headers = response.headers().asHttpHeaders();

        Limits limits = new Limits();
        limits.setWindowSize(headers.getFirst("X-Window-Size"));
        limits.setWindowRemaining(headers.getFirst("X-Window-Remaining"));
        limits.setRequestsQuota(headers.getFirst("X-Requests-Quota");
        limits.setRequestsLeft(headers.getFirst("X-Requests-Remaining");

        return response.bodyToMono(Paginated.class)
                .map(paginated -> { 
                    paginated.setLimits(limits);
                    return paginated;
                });
    });

之后,我发出一个包含页码的 Flux,并且对于每一页,我执行一个 REST API 请求,每个请求都被足够延迟,因此它不会超过限制,并且 return 提取项目的通量:

return paginated.flatMapMany(paginated -> {
    return Flux.range(1, paginated.getMeta().getPagination().getTotal())
            .delayElements(Duration.ofMillis(paginated.getLimits().getWindowRemaining() / paginated.getLimits().getRequestsQuota()))
            .flatMap(page -> {
                return client.get()
                        .uri(pageUri)
                        .retrieve()
                        .bodyToMono(Item.class)
                        .flatMapMany(p -> Flux.fromIterable(p.getData()));
            });
});

这确实有效,但我对此并不满意,因为:

所以我的问题是如何重构它,使其不需要初始请求(而是从第一个请求中获取限制、页码和数据,并继续浏览所有页面,同时更新(并尊重)限制。

我认为这段代码可以满足您的需求。这个想法是创建一个 flux 来调用您的资源服务器,但在处理响应的过程中,在该 flux 上添加一个新事件以便能够调用下一页。

代码组成:

包含要调用的下一页和执行调用之前要等待的延迟的简单包装器

private class WaitAndNext{
    private String next;
    private long delay;
}

将进行 HTTP 调用并处理响应的 FluxProcessor:

FluxProcessor<WaitAndNext, WaitAndNext> processor= DirectProcessor.<WaitAndNext>create();
FluxSink<WaitAndNext> sink=processor.sink();

processor
    .flatMap(x-> Mono.just(x).delayElement(Duration.ofMillis(x.delay)))
    .map(x-> WebClient.builder()
    .baseUrl(x.next)
    .defaultHeader("Accept","application/json")
    .build())
    .flatMap(x->x.get()        
                 .exchange()
                 .flatMapMany(z->manageResponse(sink, z))
            )
    .subscribe(........);

我用一个只管理响应的方法拆分代码:它只是解包你的数据并向接收器添加一个新事件(该事件在给定延迟后调用下一页)

private Flux<Data> manageResponse(FluxSink<WaitAndNext> sink, ClientResponse resp) {

    if (resp.statusCode()!= HttpStatus.OK){
        sink.error(new IllegalStateException("Status code invalid"));
    }

    WaitAndNext wn=new WaitAndNext();
    HttpHeaders headers=resp.headers().asHttpHeaders();
    wn.delay= Integer.parseInt(headers.getFirst("X-Window-Remaining"))/ Integer.parseInt(headers.getFirst("X-Requests-Quota"));

    return resp.bodyToMono(Item.class)
        .flatMapMany(p -> {
            if (p.paginated.current==p.paginated.total){
                sink.complete();
            }else{
                wn.next="https://....?page="+(p.paginated.current+1);
                sink.next(wn);
            }
            return Flux.fromIterable(p.getData());
        });
}

现在我们只需要立即调用第一页的检索来初始化系统:

WaitAndNext wn=new WaitAndNext();
wn.next="https://....?page=1";
wn.delay=0;
sink.next(wn);