使用 WebClient 和 Reactor 3.0 递归 API 调用
recursive API call with WebClient and Reactor 3.0
我终于开始学习使用 Reactor 进行函数式编程了。所以我是新手。
我要做的第一件事是使用 WebClient 调用外部 API。此调用需要递归,因为响应提供了调用参数的下一个值,我需要在下一次调用中使用它,直到满足一般情况。
结果如下:
Flux.from(p -> queryUntilNow())
.flatMap(res -> // res is object )
.subscribe( // process )
private Flux<ApiResp> queryUntilNow() {
return Flux.from(p -> {
queryAPI(since)
.doOnError(System.out::println)
.subscribe(apiResp -> {
if (since == apiResp.last) return;
since = apiResp.last;
queryUntilNow();
});
});
}
private Flux<ApiResp> queryAPI(int last) {
Flux<ApiResp> resp = kapi.get()
.uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
.retrieve()
.bodyToFlux(ApiResp.class);
return resp;
}
看来我需要更多地调整我对这种编程风格的思考,所以请给我一些例子和解释。
谢谢!
如果您只需要遍历成批返回的线性结果(而不是递归树),您可以使用重复通量,其起点在每次重复时都会发生变化。
这是一个完整的示例,它只是模拟 api 调用。您可以在 queryFrom
:
中替换您的 WebClient 调用
public class Main {
private static class ApiResp {
private final int last;
private ApiResp(int last) {
this.last = last;
}
}
public static void main(String[] args) {
queryBetween(0, 15)
.doOnNext(apiResp -> System.out.println(apiResp.last))
.blockLast();
}
public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
// The starting point of the next iteration
final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
return Flux
// defer will cause a new Flux with a new starting point to be created for each subscription
.defer(() -> queryFrom(nextIterationStart.get()))
// update the starting point of the next iteration
.doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
// repeat with a new subscription if we haven't reached the end yet
.repeat(() -> nextIterationStart.get() < endExclusive)
// make sure we didn't go past the end if queryFrom returned more results than we need
.takeWhile(apiResp -> apiResp.last < endExclusive);
}
public static Flux<ApiResp> queryFrom(int start) {
// simulates an api call that always returns 10 results from the starting point
return Flux.range(start, 10)
.map(ApiResp::new);
}
}
我终于开始学习使用 Reactor 进行函数式编程了。所以我是新手。
我要做的第一件事是使用 WebClient 调用外部 API。此调用需要递归,因为响应提供了调用参数的下一个值,我需要在下一次调用中使用它,直到满足一般情况。
结果如下:
Flux.from(p -> queryUntilNow())
.flatMap(res -> // res is object )
.subscribe( // process )
private Flux<ApiResp> queryUntilNow() {
return Flux.from(p -> {
queryAPI(since)
.doOnError(System.out::println)
.subscribe(apiResp -> {
if (since == apiResp.last) return;
since = apiResp.last;
queryUntilNow();
});
});
}
private Flux<ApiResp> queryAPI(int last) {
Flux<ApiResp> resp = kapi.get()
.uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
.retrieve()
.bodyToFlux(ApiResp.class);
return resp;
}
看来我需要更多地调整我对这种编程风格的思考,所以请给我一些例子和解释。
谢谢!
如果您只需要遍历成批返回的线性结果(而不是递归树),您可以使用重复通量,其起点在每次重复时都会发生变化。
这是一个完整的示例,它只是模拟 api 调用。您可以在 queryFrom
:
public class Main {
private static class ApiResp {
private final int last;
private ApiResp(int last) {
this.last = last;
}
}
public static void main(String[] args) {
queryBetween(0, 15)
.doOnNext(apiResp -> System.out.println(apiResp.last))
.blockLast();
}
public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
// The starting point of the next iteration
final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
return Flux
// defer will cause a new Flux with a new starting point to be created for each subscription
.defer(() -> queryFrom(nextIterationStart.get()))
// update the starting point of the next iteration
.doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
// repeat with a new subscription if we haven't reached the end yet
.repeat(() -> nextIterationStart.get() < endExclusive)
// make sure we didn't go past the end if queryFrom returned more results than we need
.takeWhile(apiResp -> apiResp.last < endExclusive);
}
public static Flux<ApiResp> queryFrom(int start) {
// simulates an api call that always returns 10 results from the starting point
return Flux.range(start, 10)
.map(ApiResp::new);
}
}