Angular & RX:改进订阅收集

Angular & RX: improved subscription collection

更新

找到解决方案后,我根据已接受的答案编写了一个小助手 ng2-rx-collector 以使其更易于使用。希望它能帮助那些一次又一次面临同样问题的人。

原题

假设我们有一个组件有两个对热可观察对象的订阅。我们在 ngOnInit 订阅它们并在 ngOnDestroy 取消订阅以避免内存泄漏/意外行为:

public ngOnInit() {
  this.s1 = o1.subscribe(console.debug.bind(console));
  this.s2 = o2.subscribe(console.debug.bind(console));
}

public ngOnDestroy() {
  this.s1.unsubscribe();
  this.s2.unsubscribe();
}

我喜欢 Rx,但每次我需要遵循这个时,我真的想自杀:

  1. 为每个订阅创建私人订阅属性
  2. 将此 属性 分配给订阅(这看起来真的很难看,因为真正的逻辑在右边)
  3. 在销毁时取消订阅每个订阅

有什么办法可以改善吗?

例如在 RxSwift 中,他们有一个 DisposeBag 以改进流程,翻译成打字稿将是:

private let bag = DisposeBag();

...

o1.subscribe(...).addDisposableTo(bag);

然后只销毁一次。问题是我找不到任何类似的 Subscription 函数。

任何想法/提示都将受到热烈欢迎。

你可以这样做:

private teardown$ = new Subject<void>();

public ngOnInit() {
    o1.takeUntil(this.teardown$).subscribe(console.debug.bind(console));
    o2.takeUntil(this.teardown$).subscribe(console.debug.bind(console));
}

public ngOnDestroy() {
   this.teardown$.next();
}

您描述的内容在 RxJS 4(如果我错了请纠正我)或在 RxPHP 中也被称为 "disposables"。在 RxJS 5 中,这称为订阅,但目的完全相同。

在下面的示例中,我有两个源 Observable。我用一个 Subscription 对象包装了他们的取消订阅调用,该对象在调用其 unsubscribe() 方法时将回调作为参数使用。

var source1 = Observable.interval(250);
var source2 = Observable.interval(350);

let sub1 = source1.subscribe(val => console.log(val));
let sub2 = source2.subscribe(val => console.log(val));

let subscriptions = new Subscription(() => {
    sub1.unsubscribe();
    sub2.unsubscribe();
});

setTimeout(() => {
    subscriptions.unsubscribe();
}, 3000);

同样,我也可以从 source1.subscribe 中获取第一个 Subscription 并添加另一个 Subscription ,它将与它自己的 unsubscribe() 调用一起使用 add()方法:

var source1 = Observable.interval(250);
var source2 = Observable.interval(350);

let subscriptions = source1.subscribe(val => console.log(val));
subscriptions.add(source2.subscribe(val => console.log(val)));

setTimeout(() => {
    subscriptions.unsubscribe();
}, 3000);

有关更多信息,请查看源代码:https://github.com/ReactiveX/rxjs/blob/master/src/Subscription.ts