RxJs 可观察分页

RxJs Observable Pagination

第一:这是我第一个使用RxJs的项目,我觉得用它会学得最好。

我找到了这个答案:Turning paginated requests into an Observable stream with RxJs 但它在评论中说:

You're exceeding the maximum call stack still. At around 430 pages returned. I think recursion might not be the best solution here

我想查询 Youtube 数据 API,结果以页面形式返回,我需要对它们进行分页。 我想象这样的工作流程可以工作: 1)发起呼叫 2) 检查响应是否有 'nextPageToken' 3) 如果有,再向 Youtube 发送一个请求 API 4)如果没有,完成

So to do this I could Imagine the following Observables / streams:
FirstRequestStream -A-X--------------->
ResponseStream     -A-A-A-A--X-------->
RequestStream      -I-A-I-A----------->
A = Action
I = Info from upper stream
X = Termination

(不确定这张图是否按照我制作的方式正确)

所以ResponseStream依赖于FirstRequestStream和RequestStream(使用merge函数)。 RequestStream 依赖于 ResponseStream(这称为循环可观察对象吗?)

-这是正确的方法吗?

-'circulating observables' 是一件好事吗,它们甚至可能吗?(我在创建一个时遇到了问题)。

-我应该先尝试其他方法吗?

-是否可以创建相互依赖的可观察流?

感谢您的帮助。

你把这个问题复杂化了,使用 defer 运算符可以更容易地解决它。

想法是您正在创建延迟的可观察对象(因此它只会在订阅后创建并开始获取数据)并将其与相同的可观察对象连接起来,但对于下一页,下一页也将与下一页连接,等等 ... 。所有这些都可以在没有递归的情况下完成。

代码如下所示:

const { defer, from, concat, EMPTY, timer } = rxjs; // = require("rxjs")
const { mergeMap, take, mapTo, tap } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
function fetchPage(page=0) {
  return timer(100).pipe(
    tap(() => console.log(`-> fetched page ${page}`)),
    mapTo({
      items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
      nextPage: page + 1,
    })
  );
}

const getItems = page => defer(() => fetchPage(page)).pipe(
  mergeMap(({ items, nextPage }) => {
    const items$ = from(items);
    const next$ = nextPage ? getItems(nextPage) : EMPTY;
    return concat(items$, next$);
  })
);

// process only first 30 items, without fetching all of the data
getItems()
 .pipe(take(30))
 .subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>

我无耻地重用了 Oles Savluk 的代码片段,它具有很好的 fetchPage 功能,并且我应用了 Picci 链接到的博客文章中解释的想法(在评论中),使用 expand.

Article on expand by Nicholas Jamieson

它给出了一个稍微简单的代码,在 expand 调用中隐藏了递归(如果需要,文章的评论显示了如何对其进行线性化)。

const { timer, EMPTY } = rxjs; // = require("rxjs")
const { concatMap, expand, mapTo, tap, toArray } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
const pageNumber = 3;
function fetchPage(page = 0) {
  return timer(1000).pipe(
    tap(() => console.log(`-> fetched page ${page}`)),
    mapTo({
      items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
      nextPage: ++page === pageNumber ? undefined : page,
    }),
  );
}

const list = fetchPage().pipe(
  expand(({ nextPage }) => nextPage ? fetchPage(nextPage) : EMPTY),
  concatMap(({ items }) => items),
  // Transforms the stream of numbers (Observable<number>)
  // to a stream with only an array of numbers (Observable<number[]>).
  // Remove if you want a stream of numbers, not waiting for all requests to complete.
  toArray(),
);

list.subscribe(console.log);
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>

这是我使用 rxjs 运算符 expandreduceempty 以及 HttpClient 模块的解决方案:

假设您的 API 响应是一个包含如下形状的对象

interface Response {
  data: items[]; // array of result items
  next: string|null; // url of next page, or null if there are no more items
}

您可以像这样使用展开和缩小

getAllResults(url) {
  return this.http.get(url).pipe(
    expand((res) => res.next ? this.http.get(res.next) : EMPTY),
    reduce((acc, res) => acc.concat(res.data), [])
  );
}

LuJaks 无疑是最简单的方法!

举一个单行示例,假设您有一个函数可以对给定页面发出 http 请求,并且 returns 一个(部分)数据数组。我们调用该函数直到服务器 returns 空数组 :

import { Observable, EMPTY, of } from "rxjs";
import { expand, reduce } from "rxjs/operators";

// Mock a request that returns only 5 pages... 
function httpGet(p): Observable<number[]> {
  if (p > 5) { return of([]); }
  return of(new Array(10).fill(0).map((_, i) => p * 10 + i));
}

httpGet(0).pipe( // get the fist page
    expand((value, index) => (value.length > 0 ? httpGet(index + 1) : EMPTY)), // other pages
    reduce((a, v) => [...a, ...v], []), // optional if you want only one emit
  ).subscribe((x) => console.log(x));