从一系列分页网络调用中生成 Spring Flux

Generating a Spring Flux from a sequence of paged network calls

我正在使用 Spring 反应式 WebFlux 客户端调用 API、api.magicthegathering.io/v1/cards。响应是包含 100 张卡片的页面,以及包含 "next" 和 "last" 页面链接的 headers,例如"last" 是 api.magicthegathering.io/v1/cards?page=426(而 "next" 就是 n+1)。我想生成一个 Flux<Card> 单独提供每张卡片,有一个入口点,例如Flux<Card> getAllCards().

我目前有一个 CardsClient 组件 returns 一个 Mono<CardPage>CardPage 有一个 cards() 方法,其中 returns 所有卡片(这是 API 响应模型的 1:1 表示)。最重要的是,我有一个 CardCatalog 组件,上面有 getAllCards() 方法。

我尝试过使用 Flux::expandFlux::generate,它们有些工作,但这些实现有缺陷。

这是我当前 CardCatalog::getAllCards() 迭代的片段。问题是 expand 的递归性质导致对 client::getNextPage 的冗余调用;显然我没有使用正确的方法。

  @Override
  public Flux<Card> getAllCards() {
    return client.getFirstPage().flux().expand(client::getNextPage)
        .map(Page::cards)
        .flatMap(Flux::fromIterable)
        .map(mapper::convert)
        .cache();
  }

以前我使用的是 generate,但问题是它总是会抓取所有页面(相当慢),即使订阅者只决定 take(20) 卡片:

 @Override
  public Flux<Card> getAllCards() {
    final Flux<Page> pageFlux =
        generate(client::getFirstPage, (response, sink) -> {
          final var page = response.block();
          sink.next(page);

          if (page.next().isPresent()) {
            return client.getNextPage(page);
          }
          sink.complete();
          return null;
        });

    return pageFlux.flatMapIterable(Page::cards).map(mapper::convert);
  }

完整代码在这里:https://github.com/myersadamk/mtg-api-client

使用 expand,我向 client::getNextPage() 添加了打印。如您所见,创建的图表进行了冗余调用。

Getting https://api.magicthegathering.io/v1/cards?page=1
Getting https://api.magicthegathering.io/v1/cards?page=7
Getting https://api.magicthegathering.io/v1/cards?page=2
Getting https://api.magicthegathering.io/v1/cards?page=8
Getting https://api.magicthegathering.io/v1/cards?page=3
Getting https://api.magicthegathering.io/v1/cards?page=9
Getting https://api.magicthegathering.io/v1/cards?page=4
Getting https://api.magicthegathering.io/v1/cards?page=10
Getting https://api.magicthegathering.io/v1/cards?page=5
Getting https://api.magicthegathering.io/v1/cards?page=11
Getting https://api.magicthegathering.io/v1/cards?page=6
Getting https://api.magicthegathering.io/v1/cards?page=12
Getting https://api.magicthegathering.io/v1/cards?page=7

我想要更多这样的东西:

Getting https://api.magicthegathering.io/v1/cards?page=1
Getting https://api.magicthegathering.io/v1/cards?page=2
Getting https://api.magicthegathering.io/v1/cards?page=3

(最后说明:并行化并直接调用 URI 肯定会更快,但是绕过 next/last 机制和 hard-code URI 感觉有点傻。我可能会结束这样做,但仍然想解决这个问题。)

好的,我想出了一些有用的方法。我决定使用页面计数方法来尝试并行化,尽管它并没有更快,因为网络 IO 仍然是瓶颈。我可能会回到 header link 爬网并使用缓存。大致来说,神奇的数字和所有,这就是它的样子:

  @Override
  public Flux<Card> getAllCards() {
    return client.getPageCount().flatMapMany(pageCount ->
        Flux.concat(
            range(1, pageCount)
                .parallel(pageCount / 6).runOn(Schedulers.parallel())
                .map(client::getPage)
        ).map(Page::cards).flatMap(Flux::fromIterable).map(mapper::convert)
    );
  }

我认为这是顺序 non-blocking 方法:

public Flux<Card> getAllCards() {
    PaginationParams paginationParams = new PaginationParams();

    final Flux<Page> pageFlux = Mono
            .defer(() -> client.getPage(paginationParams))
            .doOnNext(page -> {
                if (page.next().isPresent()) {
                    paginationParams.setPage(page.next().get());
                } else {
                    paginationParams.setPage(null);
                }
            })
            .repeat(() -> paginationParams.getPage() != null);

    return pageFlux.flatMapIterable(Page::cards).map(mapper::convert);
}