RxJs 可观察分页
RxJs Observable Pagination
第一:这是我第一个使用RxJs的项目,我觉得用它会学得最好。
我找到了这个答案:Turning paginated requests into an Observable stream with RxJs
但它在评论中说:
You're exceeding the maximum call stack still. At around 430 pages returned. I think recursion might not be the best solution here
我想查询 Youtube 数据 API,结果以页面形式返回,我需要对它们进行分页。
我想象这样的工作流程可以工作:
1)发起呼叫
2) 检查响应是否有 'nextPageToken'
3) 如果有,再向 Youtube 发送一个请求 API
4)如果没有,完成
So to do this I could Imagine the following Observables / streams:
FirstRequestStream -A-X--------------->
ResponseStream -A-A-A-A--X-------->
RequestStream -I-A-I-A----------->
A = Action
I = Info from upper stream
X = Termination
(不确定这张图是否按照我制作的方式正确)
所以ResponseStream依赖于FirstRequestStream和RequestStream(使用merge函数)。 RequestStream 依赖于 ResponseStream(这称为循环可观察对象吗?)
-这是正确的方法吗?
-'circulating observables' 是一件好事吗,它们甚至可能吗?(我在创建一个时遇到了问题)。
-我应该先尝试其他方法吗?
-是否可以创建相互依赖的可观察流?
感谢您的帮助。
你把这个问题复杂化了,使用 defer 运算符可以更容易地解决它。
想法是您正在创建延迟的可观察对象(因此它只会在订阅后创建并开始获取数据)并将其与相同的可观察对象连接起来,但对于下一页,下一页也将与下一页连接,等等 ... 。所有这些都可以在没有递归的情况下完成。
代码如下所示:
const { defer, from, concat, EMPTY, timer } = rxjs; // = require("rxjs")
const { mergeMap, take, mapTo, tap } = rxjs.operators; // = require("rxjs/operators")
// simulate network request
function fetchPage(page=0) {
return timer(100).pipe(
tap(() => console.log(`-> fetched page ${page}`)),
mapTo({
items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
nextPage: page + 1,
})
);
}
const getItems = page => defer(() => fetchPage(page)).pipe(
mergeMap(({ items, nextPage }) => {
const items$ = from(items);
const next$ = nextPage ? getItems(nextPage) : EMPTY;
return concat(items$, next$);
})
);
// process only first 30 items, without fetching all of the data
getItems()
.pipe(take(30))
.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
我无耻地重用了 Oles Savluk 的代码片段,它具有很好的 fetchPage
功能,并且我应用了 Picci 链接到的博客文章中解释的想法(在评论中),使用 expand
.
Article on expand by Nicholas Jamieson
它给出了一个稍微简单的代码,在 expand
调用中隐藏了递归(如果需要,文章的评论显示了如何对其进行线性化)。
const { timer, EMPTY } = rxjs; // = require("rxjs")
const { concatMap, expand, mapTo, tap, toArray } = rxjs.operators; // = require("rxjs/operators")
// simulate network request
const pageNumber = 3;
function fetchPage(page = 0) {
return timer(1000).pipe(
tap(() => console.log(`-> fetched page ${page}`)),
mapTo({
items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
nextPage: ++page === pageNumber ? undefined : page,
}),
);
}
const list = fetchPage().pipe(
expand(({ nextPage }) => nextPage ? fetchPage(nextPage) : EMPTY),
concatMap(({ items }) => items),
// Transforms the stream of numbers (Observable<number>)
// to a stream with only an array of numbers (Observable<number[]>).
// Remove if you want a stream of numbers, not waiting for all requests to complete.
toArray(),
);
list.subscribe(console.log);
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
这是我使用 rxjs 运算符 expand
、reduce
和 empty
以及 HttpClient 模块的解决方案:
假设您的 API 响应是一个包含如下形状的对象
interface Response {
data: items[]; // array of result items
next: string|null; // url of next page, or null if there are no more items
}
您可以像这样使用展开和缩小
getAllResults(url) {
return this.http.get(url).pipe(
expand((res) => res.next ? this.http.get(res.next) : EMPTY),
reduce((acc, res) => acc.concat(res.data), [])
);
}
LuJaks 无疑是最简单的方法!
举一个单行示例,假设您有一个函数可以对给定页面发出 http 请求,并且 returns 一个(部分)数据数组。我们调用该函数直到服务器 returns 空数组 :
import { Observable, EMPTY, of } from "rxjs";
import { expand, reduce } from "rxjs/operators";
// Mock a request that returns only 5 pages...
function httpGet(p): Observable<number[]> {
if (p > 5) { return of([]); }
return of(new Array(10).fill(0).map((_, i) => p * 10 + i));
}
httpGet(0).pipe( // get the fist page
expand((value, index) => (value.length > 0 ? httpGet(index + 1) : EMPTY)), // other pages
reduce((a, v) => [...a, ...v], []), // optional if you want only one emit
).subscribe((x) => console.log(x));
第一:这是我第一个使用RxJs的项目,我觉得用它会学得最好。
我找到了这个答案:Turning paginated requests into an Observable stream with RxJs 但它在评论中说:
You're exceeding the maximum call stack still. At around 430 pages returned. I think recursion might not be the best solution here
我想查询 Youtube 数据 API,结果以页面形式返回,我需要对它们进行分页。 我想象这样的工作流程可以工作: 1)发起呼叫 2) 检查响应是否有 'nextPageToken' 3) 如果有,再向 Youtube 发送一个请求 API 4)如果没有,完成
So to do this I could Imagine the following Observables / streams:
FirstRequestStream -A-X--------------->
ResponseStream -A-A-A-A--X-------->
RequestStream -I-A-I-A----------->
A = Action
I = Info from upper stream
X = Termination
(不确定这张图是否按照我制作的方式正确)
所以ResponseStream依赖于FirstRequestStream和RequestStream(使用merge函数)。 RequestStream 依赖于 ResponseStream(这称为循环可观察对象吗?)
-这是正确的方法吗?
-'circulating observables' 是一件好事吗,它们甚至可能吗?(我在创建一个时遇到了问题)。
-我应该先尝试其他方法吗?
-是否可以创建相互依赖的可观察流?
感谢您的帮助。
你把这个问题复杂化了,使用 defer 运算符可以更容易地解决它。
想法是您正在创建延迟的可观察对象(因此它只会在订阅后创建并开始获取数据)并将其与相同的可观察对象连接起来,但对于下一页,下一页也将与下一页连接,等等 ... 。所有这些都可以在没有递归的情况下完成。
代码如下所示:
const { defer, from, concat, EMPTY, timer } = rxjs; // = require("rxjs")
const { mergeMap, take, mapTo, tap } = rxjs.operators; // = require("rxjs/operators")
// simulate network request
function fetchPage(page=0) {
return timer(100).pipe(
tap(() => console.log(`-> fetched page ${page}`)),
mapTo({
items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
nextPage: page + 1,
})
);
}
const getItems = page => defer(() => fetchPage(page)).pipe(
mergeMap(({ items, nextPage }) => {
const items$ = from(items);
const next$ = nextPage ? getItems(nextPage) : EMPTY;
return concat(items$, next$);
})
);
// process only first 30 items, without fetching all of the data
getItems()
.pipe(take(30))
.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
我无耻地重用了 Oles Savluk 的代码片段,它具有很好的 fetchPage
功能,并且我应用了 Picci 链接到的博客文章中解释的想法(在评论中),使用 expand
.
Article on expand by Nicholas Jamieson
它给出了一个稍微简单的代码,在 expand
调用中隐藏了递归(如果需要,文章的评论显示了如何对其进行线性化)。
const { timer, EMPTY } = rxjs; // = require("rxjs")
const { concatMap, expand, mapTo, tap, toArray } = rxjs.operators; // = require("rxjs/operators")
// simulate network request
const pageNumber = 3;
function fetchPage(page = 0) {
return timer(1000).pipe(
tap(() => console.log(`-> fetched page ${page}`)),
mapTo({
items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
nextPage: ++page === pageNumber ? undefined : page,
}),
);
}
const list = fetchPage().pipe(
expand(({ nextPage }) => nextPage ? fetchPage(nextPage) : EMPTY),
concatMap(({ items }) => items),
// Transforms the stream of numbers (Observable<number>)
// to a stream with only an array of numbers (Observable<number[]>).
// Remove if you want a stream of numbers, not waiting for all requests to complete.
toArray(),
);
list.subscribe(console.log);
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
这是我使用 rxjs 运算符 expand
、reduce
和 empty
以及 HttpClient 模块的解决方案:
假设您的 API 响应是一个包含如下形状的对象
interface Response {
data: items[]; // array of result items
next: string|null; // url of next page, or null if there are no more items
}
您可以像这样使用展开和缩小
getAllResults(url) {
return this.http.get(url).pipe(
expand((res) => res.next ? this.http.get(res.next) : EMPTY),
reduce((acc, res) => acc.concat(res.data), [])
);
}
LuJaks 无疑是最简单的方法!
举一个单行示例,假设您有一个函数可以对给定页面发出 http 请求,并且 returns 一个(部分)数据数组。我们调用该函数直到服务器 returns 空数组 :
import { Observable, EMPTY, of } from "rxjs";
import { expand, reduce } from "rxjs/operators";
// Mock a request that returns only 5 pages...
function httpGet(p): Observable<number[]> {
if (p > 5) { return of([]); }
return of(new Array(10).fill(0).map((_, i) => p * 10 + i));
}
httpGet(0).pipe( // get the fist page
expand((value, index) => (value.length > 0 ? httpGet(index + 1) : EMPTY)), // other pages
reduce((a, v) => [...a, ...v], []), // optional if you want only one emit
).subscribe((x) => console.log(x));