RxJS: takeUntil() Angular 组件的 ngOnDestroy()
RxJS: takeUntil() Angular component's ngOnDestroy()
tl;dr:基本上我想将 Angular 的 ngOnDestroy
与 Rxjs takeUntil()
运算符结合起来。 -- 这可能吗?
我有一个 Angular 组件可以打开多个 Rxjs 订阅。
这些需要在组件销毁时关闭。
一个简单的解决方案是:
class myComponent {
private subscriptionA;
private subscriptionB;
private subscriptionC;
constructor(
private serviceA: ServiceA,
private serviceB: ServiceB,
private serviceC: ServiceC) {}
ngOnInit() {
this.subscriptionA = this.serviceA.subscribe(...);
this.subscriptionB = this.serviceB.subscribe(...);
this.subscriptionC = this.serviceC.subscribe(...);
}
ngOnDestroy() {
this.subscriptionA.unsubscribe();
this.subscriptionB.unsubscribe();
this.subscriptionC.unsubscribe();
}
}
这可行,但有点多余。我特别不喜欢那样
- unsubscribe()
在别的地方,所以你要记住它们是链接的。
- 组件状态被订阅污染。
我更喜欢使用 takeUntil()
运算符或类似的东西,使它看起来像这样:
class myComponent {
constructor(
private serviceA: ServiceA,
private serviceB: ServiceB,
private serviceC: ServiceC) {}
ngOnInit() {
const destroy = Observable.fromEvent(???).first();
this.subscriptionA = this.serviceA.subscribe(...).takeUntil(destroy);
this.subscriptionB = this.serviceB.subscribe(...).takeUntil(destroy);
this.subscriptionC = this.serviceC.subscribe(...).takeUntil(destroy);
}
}
是否有销毁事件或类似事件可以让我使用 takeUntil()
或其他方式来简化组件架构?
我意识到我可以在构造函数中自己创建一个事件或在 ngOnDestroy()
中触发的事件,但最终不会使事情变得更容易阅读。
您可以为此利用 ReplaySubject
:
编辑: 不同于 RxJS 6.x:
注意pipe()
方法的使用。
class myComponent {
private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);
constructor(
private serviceA: ServiceA,
private serviceB: ServiceB,
private serviceC: ServiceC) {}
ngOnInit() {
this.serviceA
.pipe(takeUntil(this.destroyed$))
.subscribe(...);
this.serviceB
.pipe(takeUntil(this.destroyed$))
.subscribe(...);
this.serviceC
.pipe(takeUntil(this.destroyed$))
.subscribe(...);
}
ngOnDestroy() {
this.destroyed$.next(true);
this.destroyed$.complete();
}
}
这仅对 RxJS 5.x 及更早版本有效:
class myComponentOld {
private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);
constructor(private serviceA: ServiceA) {}
ngOnInit() {
this.serviceA
.takeUntil(this.destroyed$)
.subscribe(...);
}
ngOnDestroy() {
this.destroyed$.next(true);
this.destroyed$.complete();
}
}
好吧,这归结为您关闭订阅的意思。基本上有两种方法可以做到这一点:
- 使用完成链的运算符(例如
takeWhile()
)。
- 取消订阅源 Observable。
很高兴知道这两个不同。
例如使用 takeWhile()
时,您让操作员发送 complete
通知,该通知会传播给您的观察者。所以如果你定义:
...
.subscribe(..., ..., () => doWhatever());
然后当你用例如完成链。 takeWhile()
doWhatever()
函数将被调用。
例如它可能看起来像这样:
const Observable = Rx.Observable;
const Subject = Rx.Subject;
let source = Observable.timer(0, 1000);
let subject = new Subject();
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 1'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 2'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 3'));
setTimeout(() => {
subject.next();
}, 3000);
3 秒后将调用所有完整的回调。
另一方面,当您取消订阅时,您表示您不再对源 Observable 生成的项目感兴趣。然而,这并不意味着源必须完成。你就是不在乎了。
这意味着您可以收集来自 .subscribe(...)
个电话的所有 Subscription
并立即取消订阅:
let subscriptions = new Rx.Subscription();
let source = Observable.timer(0, 1000);
subscriptions.add(source.subscribe(null, null, () => console.log('complete 1')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 2')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 3')));
setTimeout(() => {
subscriptions.unsubscribe();
}, 3000);
现在,在 3 秒延迟后,不会向控制台打印任何内容,因为我们取消订阅并且没有调用完整的回调。
所以你想使用什么取决于你和你的用例。请注意,取消订阅与完成订阅不同,尽管我猜在您的情况下这并不重要。
使用 npm 包 @w11k/ngx-componentdestroyed 中的 componentDestroyed()
函数是迄今为止使用 takeUntil:
最简单的方法
@Component({
selector: 'foo',
templateUrl: './foo.component.html'
})
export class FooComponent implements OnInit, OnDestroy {
ngOnInit() {
Observable.interval(1000)
.takeUntil(componentDestroyed(this)) // <--- magic is here!
.subscribe(console.log);
}
ngOnDestroy() {}
}
这里是 componentDestroyed()
的一个版本,可以直接包含在您的代码中:
// Based on https://www.npmjs.com/package/ng2-rx-componentdestroyed
import { OnDestroy } from '@angular/core';
import { ReplaySubject } from 'rxjs/ReplaySubject';
export function componentDestroyed(component: OnDestroy) {
const oldNgOnDestroy = component.ngOnDestroy;
const destroyed$ = new ReplaySubject<void>(1);
component.ngOnDestroy = () => {
oldNgOnDestroy.apply(component);
destroyed$.next(undefined);
destroyed$.complete();
};
return destroyed$;
}
如果您的组件直接绑定到路由,您可以利用 Router
events with takeUntil()
避免添加状态。这样,一旦您离开该组件,它就会自动为您清理其订阅。
import { Component, OnInit } from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
import { MyService } from './my.service';
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/takeUntil';
@Component({
...
})
export class ExampleComopnent implements OnInit {
constructor(private router: Router, private myService: MyService) {
}
ngOnInit() {
this.myService.methodA()
.takeUntil(this.router.events)
.subscribe(dataA => {
...
});
this.myService.methodB()
.takeUntil(this.router.events)
.subscribe(dataB => {
...
});
}
}
注意:这个简单的示例没有考虑受保护的路线或取消的路线导航。如果有机会 one of the router events could be triggered but route navigation gets cancelled, you'll need to filter on the router events so it gets triggered at the appropriate point - for example, after the Route Guard 检查或导航完成后。
this.myService.methodA()
.takeUntil(this.router.events.filter(e => e instanceOf NavigationEnd))
.subscribe(dataA => {
...
});
创建基地class
import { Subject } from 'rxjs/Rx';
import { OnDestroy } from '@angular/core';
export abstract class Base implements OnDestroy {
protected componentDestroyed$: Subject<any>;
constructor() {
this.componentDestroyed$ = new Subject<void>();
const destroyFunc = this.ngOnDestroy;
this.ngOnDestroy = () => {
destroyFunc.bind(this)();
this.componentDestroyed$.next();
this.componentDestroyed$.complete();
};
}
// placeholder of ngOnDestroy. no need to do super() call of extended class.
public ngOnDestroy() {
// no-op
}
}
组件将是,
扩展基础 class
export class Test extends Base{
}
订阅期间
service.takeUntil(this.componentDestroyed$
.subscribe(...)
这是一个全局级别的更改,无论何时您想要订阅,都可以在整个项目中使用相同的方法。在需要的任何更改中,您可以在基础 class
中进行修改
请使用带 TakeUntil 的多态性(2022 年 4 月 13 日)
如果您在制作的每个组件中都写 protected destroy$ = new Subject<void>();
,那么您应该问自己,“为什么我没有遵循 DRY(不要重复自己)原理?
为了遵循 DRY 原则,创建一个抽象基础组件(抽象 类 不能直接实例化)来处理你的销毁信号。
@Component({ template: '' })
export abstract class BaseComponent extends Subscribable {
// Don't let the outside world trigger this destroy signal.
// It's only meant to be trigger by the component when destroyed!
private _destroy = new Subject<void>();
public destroy$ = this._destroy as Observable<void>;
/** Lifecycle hook called by angular framework when extended class dies. */
ngOnDestroy(): void {
this._destroy.next();
}
}
做一个方便的扩展函数来简化事情。
declare module 'rxjs/internal/Observable' {
interface Observable<T> {
dieWith(comp: BaseComponent): Observable<T>;
}
}
Observable.prototype.dieWith = function<T>(comp: BaseComponent): Observable<T> {
return this.pipe(takeUntil(comp.destroy$));
};
只要您需要处理订阅,就扩展您的 BaseComponent。
@Component({ ... })
export class myComponent extends BaseComponent {
constructor(
private serviceA: ServiceA,
private serviceB: ServiceB,
private serviceC: ServiceC
) {
super();
}
ngOnInit() {
this.serviceA.a$.dieWith(this).subscribe(...);
this.serviceB.b$.dieWith(this).subscribe(...);
this.serviceC.c$.dieWith(this).subscribe(...);
}
}
您已经像专业人士一样在 Angular 组件中正式处理订阅。
您的同事稍后会感谢您!
编码愉快!
tl;dr:基本上我想将 Angular 的 ngOnDestroy
与 Rxjs takeUntil()
运算符结合起来。 -- 这可能吗?
我有一个 Angular 组件可以打开多个 Rxjs 订阅。 这些需要在组件销毁时关闭。
一个简单的解决方案是:
class myComponent {
private subscriptionA;
private subscriptionB;
private subscriptionC;
constructor(
private serviceA: ServiceA,
private serviceB: ServiceB,
private serviceC: ServiceC) {}
ngOnInit() {
this.subscriptionA = this.serviceA.subscribe(...);
this.subscriptionB = this.serviceB.subscribe(...);
this.subscriptionC = this.serviceC.subscribe(...);
}
ngOnDestroy() {
this.subscriptionA.unsubscribe();
this.subscriptionB.unsubscribe();
this.subscriptionC.unsubscribe();
}
}
这可行,但有点多余。我特别不喜欢那样
- unsubscribe()
在别的地方,所以你要记住它们是链接的。
- 组件状态被订阅污染。
我更喜欢使用 takeUntil()
运算符或类似的东西,使它看起来像这样:
class myComponent {
constructor(
private serviceA: ServiceA,
private serviceB: ServiceB,
private serviceC: ServiceC) {}
ngOnInit() {
const destroy = Observable.fromEvent(???).first();
this.subscriptionA = this.serviceA.subscribe(...).takeUntil(destroy);
this.subscriptionB = this.serviceB.subscribe(...).takeUntil(destroy);
this.subscriptionC = this.serviceC.subscribe(...).takeUntil(destroy);
}
}
是否有销毁事件或类似事件可以让我使用 takeUntil()
或其他方式来简化组件架构?
我意识到我可以在构造函数中自己创建一个事件或在 ngOnDestroy()
中触发的事件,但最终不会使事情变得更容易阅读。
您可以为此利用 ReplaySubject
:
编辑: 不同于 RxJS 6.x:
注意pipe()
方法的使用。
class myComponent {
private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);
constructor(
private serviceA: ServiceA,
private serviceB: ServiceB,
private serviceC: ServiceC) {}
ngOnInit() {
this.serviceA
.pipe(takeUntil(this.destroyed$))
.subscribe(...);
this.serviceB
.pipe(takeUntil(this.destroyed$))
.subscribe(...);
this.serviceC
.pipe(takeUntil(this.destroyed$))
.subscribe(...);
}
ngOnDestroy() {
this.destroyed$.next(true);
this.destroyed$.complete();
}
}
这仅对 RxJS 5.x 及更早版本有效:
class myComponentOld {
private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);
constructor(private serviceA: ServiceA) {}
ngOnInit() {
this.serviceA
.takeUntil(this.destroyed$)
.subscribe(...);
}
ngOnDestroy() {
this.destroyed$.next(true);
this.destroyed$.complete();
}
}
好吧,这归结为您关闭订阅的意思。基本上有两种方法可以做到这一点:
- 使用完成链的运算符(例如
takeWhile()
)。 - 取消订阅源 Observable。
很高兴知道这两个不同。
例如使用 takeWhile()
时,您让操作员发送 complete
通知,该通知会传播给您的观察者。所以如果你定义:
...
.subscribe(..., ..., () => doWhatever());
然后当你用例如完成链。 takeWhile()
doWhatever()
函数将被调用。
例如它可能看起来像这样:
const Observable = Rx.Observable;
const Subject = Rx.Subject;
let source = Observable.timer(0, 1000);
let subject = new Subject();
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 1'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 2'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 3'));
setTimeout(() => {
subject.next();
}, 3000);
3 秒后将调用所有完整的回调。
另一方面,当您取消订阅时,您表示您不再对源 Observable 生成的项目感兴趣。然而,这并不意味着源必须完成。你就是不在乎了。
这意味着您可以收集来自 .subscribe(...)
个电话的所有 Subscription
并立即取消订阅:
let subscriptions = new Rx.Subscription();
let source = Observable.timer(0, 1000);
subscriptions.add(source.subscribe(null, null, () => console.log('complete 1')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 2')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 3')));
setTimeout(() => {
subscriptions.unsubscribe();
}, 3000);
现在,在 3 秒延迟后,不会向控制台打印任何内容,因为我们取消订阅并且没有调用完整的回调。
所以你想使用什么取决于你和你的用例。请注意,取消订阅与完成订阅不同,尽管我猜在您的情况下这并不重要。
使用 npm 包 @w11k/ngx-componentdestroyed 中的 componentDestroyed()
函数是迄今为止使用 takeUntil:
@Component({
selector: 'foo',
templateUrl: './foo.component.html'
})
export class FooComponent implements OnInit, OnDestroy {
ngOnInit() {
Observable.interval(1000)
.takeUntil(componentDestroyed(this)) // <--- magic is here!
.subscribe(console.log);
}
ngOnDestroy() {}
}
这里是 componentDestroyed()
的一个版本,可以直接包含在您的代码中:
// Based on https://www.npmjs.com/package/ng2-rx-componentdestroyed
import { OnDestroy } from '@angular/core';
import { ReplaySubject } from 'rxjs/ReplaySubject';
export function componentDestroyed(component: OnDestroy) {
const oldNgOnDestroy = component.ngOnDestroy;
const destroyed$ = new ReplaySubject<void>(1);
component.ngOnDestroy = () => {
oldNgOnDestroy.apply(component);
destroyed$.next(undefined);
destroyed$.complete();
};
return destroyed$;
}
如果您的组件直接绑定到路由,您可以利用 Router
events with takeUntil()
避免添加状态。这样,一旦您离开该组件,它就会自动为您清理其订阅。
import { Component, OnInit } from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
import { MyService } from './my.service';
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/takeUntil';
@Component({
...
})
export class ExampleComopnent implements OnInit {
constructor(private router: Router, private myService: MyService) {
}
ngOnInit() {
this.myService.methodA()
.takeUntil(this.router.events)
.subscribe(dataA => {
...
});
this.myService.methodB()
.takeUntil(this.router.events)
.subscribe(dataB => {
...
});
}
}
注意:这个简单的示例没有考虑受保护的路线或取消的路线导航。如果有机会 one of the router events could be triggered but route navigation gets cancelled, you'll need to filter on the router events so it gets triggered at the appropriate point - for example, after the Route Guard 检查或导航完成后。
this.myService.methodA()
.takeUntil(this.router.events.filter(e => e instanceOf NavigationEnd))
.subscribe(dataA => {
...
});
创建基地class
import { Subject } from 'rxjs/Rx';
import { OnDestroy } from '@angular/core';
export abstract class Base implements OnDestroy {
protected componentDestroyed$: Subject<any>;
constructor() {
this.componentDestroyed$ = new Subject<void>();
const destroyFunc = this.ngOnDestroy;
this.ngOnDestroy = () => {
destroyFunc.bind(this)();
this.componentDestroyed$.next();
this.componentDestroyed$.complete();
};
}
// placeholder of ngOnDestroy. no need to do super() call of extended class.
public ngOnDestroy() {
// no-op
}
}
组件将是,
扩展基础 class
export class Test extends Base{
}
订阅期间
service.takeUntil(this.componentDestroyed$
.subscribe(...)
这是一个全局级别的更改,无论何时您想要订阅,都可以在整个项目中使用相同的方法。在需要的任何更改中,您可以在基础 class
中进行修改请使用带 TakeUntil 的多态性(2022 年 4 月 13 日)
如果您在制作的每个组件中都写 protected destroy$ = new Subject<void>();
,那么您应该问自己,“为什么我没有遵循 DRY(不要重复自己)原理?
为了遵循 DRY 原则,创建一个抽象基础组件(抽象 类 不能直接实例化)来处理你的销毁信号。
@Component({ template: '' })
export abstract class BaseComponent extends Subscribable {
// Don't let the outside world trigger this destroy signal.
// It's only meant to be trigger by the component when destroyed!
private _destroy = new Subject<void>();
public destroy$ = this._destroy as Observable<void>;
/** Lifecycle hook called by angular framework when extended class dies. */
ngOnDestroy(): void {
this._destroy.next();
}
}
做一个方便的扩展函数来简化事情。
declare module 'rxjs/internal/Observable' {
interface Observable<T> {
dieWith(comp: BaseComponent): Observable<T>;
}
}
Observable.prototype.dieWith = function<T>(comp: BaseComponent): Observable<T> {
return this.pipe(takeUntil(comp.destroy$));
};
只要您需要处理订阅,就扩展您的 BaseComponent。
@Component({ ... })
export class myComponent extends BaseComponent {
constructor(
private serviceA: ServiceA,
private serviceB: ServiceB,
private serviceC: ServiceC
) {
super();
}
ngOnInit() {
this.serviceA.a$.dieWith(this).subscribe(...);
this.serviceB.b$.dieWith(this).subscribe(...);
this.serviceC.c$.dieWith(this).subscribe(...);
}
}
您已经像专业人士一样在 Angular 组件中正式处理订阅。
您的同事稍后会感谢您!
编码愉快!