RXJS:从分页界面创建可观察流的惯用方法
RXJS : Idiomatic way to create an observable stream from a paged interface
我有分页界面。给定起点,请求将生成结果列表和持续指示器。
我创建了一个 observable,它是通过构造和平面映射一个读取页面的 observable 来构建的。这个 observable 的结果包含页面的数据和要继续的值。我提取数据并将其平面映射到订阅者。产生价值流。
为了处理分页,我为下一页值创建了一个主题。它以初始值播种,然后每次我收到包含有效下一页的响应时,我都会推送到页面主题并触发另一次读取,直到没有更多内容可读为止。
有没有更惯用的方法来做到这一点?
function records(start = 'LATEST', limit = 1000) {
let pages = new rx.Subject();
this.connect(start)
.subscribe(page => pages.onNext(page));
let records = pages
.flatMap(page => {
return this.read(page, limit)
.doOnNext(result => {
let next = result.next;
if (next === undefined) {
pages.onCompleted();
} else {
pages.onNext(next);
}
});
})
.pluck('data')
.flatMap(data => data);
return records;
}
这是一种合理的方法。它有几个潜在的缺陷(可能会或可能不会影响您,具体取决于您的用例):
- 您没有提供任何方法来观察
this.connect(start)
中发生的任何错误
- 您的观察结果实际上是 hot。如果调用者没有立即订阅可观察对象(也许他们存储它并稍后订阅),那么他们将错过
this.connect(start)
的完成并且可观察对象似乎永远不会生产任何东西。
- 如果来电者改变主意并提前取消订阅,您无法提供从初始
connect
电话中取消订阅的方法。没什么大不了的,但通常当一个人构造一个可观察的对象时,应该尝试将一次性对象链接在一起,以便在调用者取消订阅时正确调用清理。
这是修改后的版本:
- 它将错误从
this.connect
传递给观察者。
- 它使用
Observable.create
创建一个 cold observable,它只有在调用者实际订阅时才开始运行,因此不会丢失初始页面值并停止运行流。
- 它将
this.connect
订阅一次性与整体订阅一次性结合
代码:
function records(start = 'LATEST', limit = 1000) {
return Rx.Observable.create(observer => {
let pages = new Rx.Subject();
let connectSub = new Rx.SingleAssignmentDisposable();
let resultsSub = new Rx.SingleAssignmentDisposable();
let sub = new Rx.CompositeDisposable(connectSub, resultsSub);
// Make sure we subscribe to pages before we issue this.connect()
// just in case this.connect() finishes synchronously (possible if it caches values or something?)
let results = pages
.flatMap(page => this.read(page, limit))
.doOnNext(r => this.next !== undefined ? pages.onNext(this.next) : pages.onCompleted())
.flatMap(r => r.data);
resultsSub.setDisposable(results.subscribe(observer));
// now query the first page
connectSub.setDisposable(this.connect(start)
.subscribe(p => pages.onNext(p), e => observer.onError(e)));
return sub;
});
}
注意:我以前没有使用过 ES6 语法,所以希望我没有弄乱这里的任何内容。
我有分页界面。给定起点,请求将生成结果列表和持续指示器。
我创建了一个 observable,它是通过构造和平面映射一个读取页面的 observable 来构建的。这个 observable 的结果包含页面的数据和要继续的值。我提取数据并将其平面映射到订阅者。产生价值流。
为了处理分页,我为下一页值创建了一个主题。它以初始值播种,然后每次我收到包含有效下一页的响应时,我都会推送到页面主题并触发另一次读取,直到没有更多内容可读为止。
有没有更惯用的方法来做到这一点?
function records(start = 'LATEST', limit = 1000) {
let pages = new rx.Subject();
this.connect(start)
.subscribe(page => pages.onNext(page));
let records = pages
.flatMap(page => {
return this.read(page, limit)
.doOnNext(result => {
let next = result.next;
if (next === undefined) {
pages.onCompleted();
} else {
pages.onNext(next);
}
});
})
.pluck('data')
.flatMap(data => data);
return records;
}
这是一种合理的方法。它有几个潜在的缺陷(可能会或可能不会影响您,具体取决于您的用例):
- 您没有提供任何方法来观察
this.connect(start)
中发生的任何错误
- 您的观察结果实际上是 hot。如果调用者没有立即订阅可观察对象(也许他们存储它并稍后订阅),那么他们将错过
this.connect(start)
的完成并且可观察对象似乎永远不会生产任何东西。 - 如果来电者改变主意并提前取消订阅,您无法提供从初始
connect
电话中取消订阅的方法。没什么大不了的,但通常当一个人构造一个可观察的对象时,应该尝试将一次性对象链接在一起,以便在调用者取消订阅时正确调用清理。
这是修改后的版本:
- 它将错误从
this.connect
传递给观察者。 - 它使用
Observable.create
创建一个 cold observable,它只有在调用者实际订阅时才开始运行,因此不会丢失初始页面值并停止运行流。 - 它将
this.connect
订阅一次性与整体订阅一次性结合
代码:
function records(start = 'LATEST', limit = 1000) {
return Rx.Observable.create(observer => {
let pages = new Rx.Subject();
let connectSub = new Rx.SingleAssignmentDisposable();
let resultsSub = new Rx.SingleAssignmentDisposable();
let sub = new Rx.CompositeDisposable(connectSub, resultsSub);
// Make sure we subscribe to pages before we issue this.connect()
// just in case this.connect() finishes synchronously (possible if it caches values or something?)
let results = pages
.flatMap(page => this.read(page, limit))
.doOnNext(r => this.next !== undefined ? pages.onNext(this.next) : pages.onCompleted())
.flatMap(r => r.data);
resultsSub.setDisposable(results.subscribe(observer));
// now query the first page
connectSub.setDisposable(this.connect(start)
.subscribe(p => pages.onNext(p), e => observer.onError(e)));
return sub;
});
}
注意:我以前没有使用过 ES6 语法,所以希望我没有弄乱这里的任何内容。