从一系列分页网络调用中生成 Spring Flux
Generating a Spring Flux from a sequence of paged network calls
我正在使用 Spring 反应式 WebFlux 客户端调用 API、api.magicthegathering.io/v1/cards。响应是包含 100 张卡片的页面,以及包含 "next" 和 "last" 页面链接的 headers,例如"last" 是 api.magicthegathering.io/v1/cards?page=426
(而 "next" 就是 n+1)。我想生成一个 Flux<Card>
单独提供每张卡片,有一个入口点,例如Flux<Card> getAllCards()
.
我目前有一个 CardsClient
组件 returns 一个 Mono<CardPage>
。 CardPage
有一个 cards()
方法,其中 returns 所有卡片(这是 API 响应模型的 1:1 表示)。最重要的是,我有一个 CardCatalog
组件,上面有 getAllCards()
方法。
我尝试过使用 Flux::expand
和 Flux::generate
,它们有些工作,但这些实现有缺陷。
这是我当前 CardCatalog::getAllCards()
迭代的片段。问题是 expand
的递归性质导致对 client::getNextPage
的冗余调用;显然我没有使用正确的方法。
@Override
public Flux<Card> getAllCards() {
return client.getFirstPage().flux().expand(client::getNextPage)
.map(Page::cards)
.flatMap(Flux::fromIterable)
.map(mapper::convert)
.cache();
}
以前我使用的是 generate
,但问题是它总是会抓取所有页面(相当慢),即使订阅者只决定 take(20)
卡片:
@Override
public Flux<Card> getAllCards() {
final Flux<Page> pageFlux =
generate(client::getFirstPage, (response, sink) -> {
final var page = response.block();
sink.next(page);
if (page.next().isPresent()) {
return client.getNextPage(page);
}
sink.complete();
return null;
});
return pageFlux.flatMapIterable(Page::cards).map(mapper::convert);
}
完整代码在这里:https://github.com/myersadamk/mtg-api-client
使用 expand
,我向 client::getNextPage()
添加了打印。如您所见,创建的图表进行了冗余调用。
Getting https://api.magicthegathering.io/v1/cards?page=1
Getting https://api.magicthegathering.io/v1/cards?page=7
Getting https://api.magicthegathering.io/v1/cards?page=2
Getting https://api.magicthegathering.io/v1/cards?page=8
Getting https://api.magicthegathering.io/v1/cards?page=3
Getting https://api.magicthegathering.io/v1/cards?page=9
Getting https://api.magicthegathering.io/v1/cards?page=4
Getting https://api.magicthegathering.io/v1/cards?page=10
Getting https://api.magicthegathering.io/v1/cards?page=5
Getting https://api.magicthegathering.io/v1/cards?page=11
Getting https://api.magicthegathering.io/v1/cards?page=6
Getting https://api.magicthegathering.io/v1/cards?page=12
Getting https://api.magicthegathering.io/v1/cards?page=7
我想要更多这样的东西:
Getting https://api.magicthegathering.io/v1/cards?page=1
Getting https://api.magicthegathering.io/v1/cards?page=2
Getting https://api.magicthegathering.io/v1/cards?page=3
(最后说明:并行化并直接调用 URI 肯定会更快,但是绕过 next/last 机制和 hard-code URI 感觉有点傻。我可能会结束这样做,但仍然想解决这个问题。)
好的,我想出了一些有用的方法。我决定使用页面计数方法来尝试并行化,尽管它并没有更快,因为网络 IO 仍然是瓶颈。我可能会回到 header link 爬网并使用缓存。大致来说,神奇的数字和所有,这就是它的样子:
@Override
public Flux<Card> getAllCards() {
return client.getPageCount().flatMapMany(pageCount ->
Flux.concat(
range(1, pageCount)
.parallel(pageCount / 6).runOn(Schedulers.parallel())
.map(client::getPage)
).map(Page::cards).flatMap(Flux::fromIterable).map(mapper::convert)
);
}
我认为这是顺序 non-blocking 方法:
public Flux<Card> getAllCards() {
PaginationParams paginationParams = new PaginationParams();
final Flux<Page> pageFlux = Mono
.defer(() -> client.getPage(paginationParams))
.doOnNext(page -> {
if (page.next().isPresent()) {
paginationParams.setPage(page.next().get());
} else {
paginationParams.setPage(null);
}
})
.repeat(() -> paginationParams.getPage() != null);
return pageFlux.flatMapIterable(Page::cards).map(mapper::convert);
}
我正在使用 Spring 反应式 WebFlux 客户端调用 API、api.magicthegathering.io/v1/cards。响应是包含 100 张卡片的页面,以及包含 "next" 和 "last" 页面链接的 headers,例如"last" 是 api.magicthegathering.io/v1/cards?page=426
(而 "next" 就是 n+1)。我想生成一个 Flux<Card>
单独提供每张卡片,有一个入口点,例如Flux<Card> getAllCards()
.
我目前有一个 CardsClient
组件 returns 一个 Mono<CardPage>
。 CardPage
有一个 cards()
方法,其中 returns 所有卡片(这是 API 响应模型的 1:1 表示)。最重要的是,我有一个 CardCatalog
组件,上面有 getAllCards()
方法。
我尝试过使用 Flux::expand
和 Flux::generate
,它们有些工作,但这些实现有缺陷。
这是我当前 CardCatalog::getAllCards()
迭代的片段。问题是 expand
的递归性质导致对 client::getNextPage
的冗余调用;显然我没有使用正确的方法。
@Override
public Flux<Card> getAllCards() {
return client.getFirstPage().flux().expand(client::getNextPage)
.map(Page::cards)
.flatMap(Flux::fromIterable)
.map(mapper::convert)
.cache();
}
以前我使用的是 generate
,但问题是它总是会抓取所有页面(相当慢),即使订阅者只决定 take(20)
卡片:
@Override
public Flux<Card> getAllCards() {
final Flux<Page> pageFlux =
generate(client::getFirstPage, (response, sink) -> {
final var page = response.block();
sink.next(page);
if (page.next().isPresent()) {
return client.getNextPage(page);
}
sink.complete();
return null;
});
return pageFlux.flatMapIterable(Page::cards).map(mapper::convert);
}
完整代码在这里:https://github.com/myersadamk/mtg-api-client
使用 expand
,我向 client::getNextPage()
添加了打印。如您所见,创建的图表进行了冗余调用。
Getting https://api.magicthegathering.io/v1/cards?page=1
Getting https://api.magicthegathering.io/v1/cards?page=7
Getting https://api.magicthegathering.io/v1/cards?page=2
Getting https://api.magicthegathering.io/v1/cards?page=8
Getting https://api.magicthegathering.io/v1/cards?page=3
Getting https://api.magicthegathering.io/v1/cards?page=9
Getting https://api.magicthegathering.io/v1/cards?page=4
Getting https://api.magicthegathering.io/v1/cards?page=10
Getting https://api.magicthegathering.io/v1/cards?page=5
Getting https://api.magicthegathering.io/v1/cards?page=11
Getting https://api.magicthegathering.io/v1/cards?page=6
Getting https://api.magicthegathering.io/v1/cards?page=12
Getting https://api.magicthegathering.io/v1/cards?page=7
我想要更多这样的东西:
Getting https://api.magicthegathering.io/v1/cards?page=1
Getting https://api.magicthegathering.io/v1/cards?page=2
Getting https://api.magicthegathering.io/v1/cards?page=3
(最后说明:并行化并直接调用 URI 肯定会更快,但是绕过 next/last 机制和 hard-code URI 感觉有点傻。我可能会结束这样做,但仍然想解决这个问题。)
好的,我想出了一些有用的方法。我决定使用页面计数方法来尝试并行化,尽管它并没有更快,因为网络 IO 仍然是瓶颈。我可能会回到 header link 爬网并使用缓存。大致来说,神奇的数字和所有,这就是它的样子:
@Override
public Flux<Card> getAllCards() {
return client.getPageCount().flatMapMany(pageCount ->
Flux.concat(
range(1, pageCount)
.parallel(pageCount / 6).runOn(Schedulers.parallel())
.map(client::getPage)
).map(Page::cards).flatMap(Flux::fromIterable).map(mapper::convert)
);
}
我认为这是顺序 non-blocking 方法:
public Flux<Card> getAllCards() {
PaginationParams paginationParams = new PaginationParams();
final Flux<Page> pageFlux = Mono
.defer(() -> client.getPage(paginationParams))
.doOnNext(page -> {
if (page.next().isPresent()) {
paginationParams.setPage(page.next().get());
} else {
paginationParams.setPage(null);
}
})
.repeat(() -> paginationParams.getPage() != null);
return pageFlux.flatMapIterable(Page::cards).map(mapper::convert);
}