Spring WebFlux Web 客户端 - 迭代分页 REST API
Spring WebFlux Web Client - Iterating paged REST API
我需要从可分页 REST API 的所有页面中获取项目。我还需要在项目可用时立即开始处理项目,而不需要等待所有页面加载完毕。为此,我正在使用 Spring WebFlux 及其 WebClient,并希望 return Flux<Item>
。
此外,我正在使用的 REST API 是有速率限制的,并且对它的每个响应都包含 headers 以及当前限制的详细信息:
- 当前大小 window
- 当前剩余时间window
- 申请配额 window
- 当前 window
中剩余的请求
单个页面请求的响应如下所示:
{
"data": [],
"meta": {
"pagination": {
"total": 10,
"current": 1
}
}
}
数据数组包含实际项目,而元 object 包含分页信息。
我当前的解决方案首先执行一个 "dummy" 请求,只是为了获取总页数和速率限制。
Mono<T> paginated = client.get()
.uri(uri)
.exchange()
.flatMap(response -> {
HttpHeaders headers = response.headers().asHttpHeaders();
Limits limits = new Limits();
limits.setWindowSize(headers.getFirst("X-Window-Size"));
limits.setWindowRemaining(headers.getFirst("X-Window-Remaining"));
limits.setRequestsQuota(headers.getFirst("X-Requests-Quota");
limits.setRequestsLeft(headers.getFirst("X-Requests-Remaining");
return response.bodyToMono(Paginated.class)
.map(paginated -> {
paginated.setLimits(limits);
return paginated;
});
});
之后,我发出一个包含页码的 Flux,并且对于每一页,我执行一个 REST API 请求,每个请求都被足够延迟,因此它不会超过限制,并且 return 提取项目的通量:
return paginated.flatMapMany(paginated -> {
return Flux.range(1, paginated.getMeta().getPagination().getTotal())
.delayElements(Duration.ofMillis(paginated.getLimits().getWindowRemaining() / paginated.getLimits().getRequestsQuota()))
.flatMap(page -> {
return client.get()
.uri(pageUri)
.retrieve()
.bodyToMono(Item.class)
.flatMapMany(p -> Flux.fromIterable(p.getData()));
});
});
这确实有效,但我对此并不满意,因为:
- 初始"dummy"请求获取页数,然后
重复相同的请求以获取实际数据。
- 它仅在初始请求时获得速率限制,并假设
限制不会改变(例如,它是唯一使用 API 的)-
这可能不是真的,在这种情况下它会得到一个错误
超出限制。
所以我的问题是如何重构它,使其不需要初始请求(而是从第一个请求中获取限制、页码和数据,并继续浏览所有页面,同时更新(并尊重)限制。
我认为这段代码可以满足您的需求。这个想法是创建一个 flux 来调用您的资源服务器,但在处理响应的过程中,在该 flux 上添加一个新事件以便能够调用下一页。
代码组成:
包含要调用的下一页和执行调用之前要等待的延迟的简单包装器
private class WaitAndNext{
private String next;
private long delay;
}
将进行 HTTP 调用并处理响应的 FluxProcessor:
FluxProcessor<WaitAndNext, WaitAndNext> processor= DirectProcessor.<WaitAndNext>create();
FluxSink<WaitAndNext> sink=processor.sink();
processor
.flatMap(x-> Mono.just(x).delayElement(Duration.ofMillis(x.delay)))
.map(x-> WebClient.builder()
.baseUrl(x.next)
.defaultHeader("Accept","application/json")
.build())
.flatMap(x->x.get()
.exchange()
.flatMapMany(z->manageResponse(sink, z))
)
.subscribe(........);
我用一个只管理响应的方法拆分代码:它只是解包你的数据并向接收器添加一个新事件(该事件在给定延迟后调用下一页)
private Flux<Data> manageResponse(FluxSink<WaitAndNext> sink, ClientResponse resp) {
if (resp.statusCode()!= HttpStatus.OK){
sink.error(new IllegalStateException("Status code invalid"));
}
WaitAndNext wn=new WaitAndNext();
HttpHeaders headers=resp.headers().asHttpHeaders();
wn.delay= Integer.parseInt(headers.getFirst("X-Window-Remaining"))/ Integer.parseInt(headers.getFirst("X-Requests-Quota"));
return resp.bodyToMono(Item.class)
.flatMapMany(p -> {
if (p.paginated.current==p.paginated.total){
sink.complete();
}else{
wn.next="https://....?page="+(p.paginated.current+1);
sink.next(wn);
}
return Flux.fromIterable(p.getData());
});
}
现在我们只需要立即调用第一页的检索来初始化系统:
WaitAndNext wn=new WaitAndNext();
wn.next="https://....?page=1";
wn.delay=0;
sink.next(wn);
我需要从可分页 REST API 的所有页面中获取项目。我还需要在项目可用时立即开始处理项目,而不需要等待所有页面加载完毕。为此,我正在使用 Spring WebFlux 及其 WebClient,并希望 return Flux<Item>
。
此外,我正在使用的 REST API 是有速率限制的,并且对它的每个响应都包含 headers 以及当前限制的详细信息:
- 当前大小 window
- 当前剩余时间window
- 申请配额 window
- 当前 window 中剩余的请求
单个页面请求的响应如下所示:
{
"data": [],
"meta": {
"pagination": {
"total": 10,
"current": 1
}
}
}
数据数组包含实际项目,而元 object 包含分页信息。
我当前的解决方案首先执行一个 "dummy" 请求,只是为了获取总页数和速率限制。
Mono<T> paginated = client.get()
.uri(uri)
.exchange()
.flatMap(response -> {
HttpHeaders headers = response.headers().asHttpHeaders();
Limits limits = new Limits();
limits.setWindowSize(headers.getFirst("X-Window-Size"));
limits.setWindowRemaining(headers.getFirst("X-Window-Remaining"));
limits.setRequestsQuota(headers.getFirst("X-Requests-Quota");
limits.setRequestsLeft(headers.getFirst("X-Requests-Remaining");
return response.bodyToMono(Paginated.class)
.map(paginated -> {
paginated.setLimits(limits);
return paginated;
});
});
之后,我发出一个包含页码的 Flux,并且对于每一页,我执行一个 REST API 请求,每个请求都被足够延迟,因此它不会超过限制,并且 return 提取项目的通量:
return paginated.flatMapMany(paginated -> {
return Flux.range(1, paginated.getMeta().getPagination().getTotal())
.delayElements(Duration.ofMillis(paginated.getLimits().getWindowRemaining() / paginated.getLimits().getRequestsQuota()))
.flatMap(page -> {
return client.get()
.uri(pageUri)
.retrieve()
.bodyToMono(Item.class)
.flatMapMany(p -> Flux.fromIterable(p.getData()));
});
});
这确实有效,但我对此并不满意,因为:
- 初始"dummy"请求获取页数,然后 重复相同的请求以获取实际数据。
- 它仅在初始请求时获得速率限制,并假设 限制不会改变(例如,它是唯一使用 API 的)- 这可能不是真的,在这种情况下它会得到一个错误 超出限制。
所以我的问题是如何重构它,使其不需要初始请求(而是从第一个请求中获取限制、页码和数据,并继续浏览所有页面,同时更新(并尊重)限制。
我认为这段代码可以满足您的需求。这个想法是创建一个 flux 来调用您的资源服务器,但在处理响应的过程中,在该 flux 上添加一个新事件以便能够调用下一页。
代码组成:
包含要调用的下一页和执行调用之前要等待的延迟的简单包装器
private class WaitAndNext{
private String next;
private long delay;
}
将进行 HTTP 调用并处理响应的 FluxProcessor:
FluxProcessor<WaitAndNext, WaitAndNext> processor= DirectProcessor.<WaitAndNext>create();
FluxSink<WaitAndNext> sink=processor.sink();
processor
.flatMap(x-> Mono.just(x).delayElement(Duration.ofMillis(x.delay)))
.map(x-> WebClient.builder()
.baseUrl(x.next)
.defaultHeader("Accept","application/json")
.build())
.flatMap(x->x.get()
.exchange()
.flatMapMany(z->manageResponse(sink, z))
)
.subscribe(........);
我用一个只管理响应的方法拆分代码:它只是解包你的数据并向接收器添加一个新事件(该事件在给定延迟后调用下一页)
private Flux<Data> manageResponse(FluxSink<WaitAndNext> sink, ClientResponse resp) {
if (resp.statusCode()!= HttpStatus.OK){
sink.error(new IllegalStateException("Status code invalid"));
}
WaitAndNext wn=new WaitAndNext();
HttpHeaders headers=resp.headers().asHttpHeaders();
wn.delay= Integer.parseInt(headers.getFirst("X-Window-Remaining"))/ Integer.parseInt(headers.getFirst("X-Requests-Quota"));
return resp.bodyToMono(Item.class)
.flatMapMany(p -> {
if (p.paginated.current==p.paginated.total){
sink.complete();
}else{
wn.next="https://....?page="+(p.paginated.current+1);
sink.next(wn);
}
return Flux.fromIterable(p.getData());
});
}
现在我们只需要立即调用第一页的检索来初始化系统:
WaitAndNext wn=new WaitAndNext();
wn.next="https://....?page=1";
wn.delay=0;
sink.next(wn);