如何在 rxjs 中对每个订阅进行一次可观察管道的初始化逻辑
How to do initialization logic on an observable pipe once per subscription in rxjs
所以我有这个可观察的管道,我需要在订阅开始时执行一次操作,就像您可以使用 finalize()
在订阅结束时执行一次操作一样
这就是我的开始,不幸的是,它会在每次针对该主题的 next()
调用中执行一次启动。
const notificationSubject = new BehaviorSubject<Notification | undefined>(undefined);
const notifications$ = this.notificationSubject.pipe(
tap(() => startup()),
filter(isValueDefined),
finalize(() => shutdown())
);
notifications$.subscribe(noti => foo(noti));
notifications$.subscribe(noti => bar(noti));
然后我们得到了这个变体:
let isStartedUp = false;
const internalStartup = () => {
if(!isStartedUp){
isStartedUp = true;
startup();
}
}
const notifications$ = notificationSubject.pipe(
tap(() => internalStartup()),
filter(isValueDefined),
finalize(() => shutdown())
);
notifications$.subscribe(noti => foo(noti));
notifications$.subscribe(noti => bar(noti));
...这是它的工作,但它做得有点太好了,因为现在启动只进行一次(并且只在第一次订阅时)而不是每次创建订阅一次。
我想应该有类似的东西,但我还没找到。
const notifications$ = notificationSubject.pipe(
initialize(() => startup()),
finalize(() => shutdown())
);
您可以使用 defer
在每次订阅时执行一些代码。
export function initialize<T>(initializer: () => void): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => defer(() => {
initializer();
return source;
});
}
const notifications$ = notificationSubject.pipe(
initialize(() => startup()),
finalize(() => shutdown())
);
所以我有这个可观察的管道,我需要在订阅开始时执行一次操作,就像您可以使用 finalize()
在订阅结束时执行一次操作一样
这就是我的开始,不幸的是,它会在每次针对该主题的 next()
调用中执行一次启动。
const notificationSubject = new BehaviorSubject<Notification | undefined>(undefined);
const notifications$ = this.notificationSubject.pipe(
tap(() => startup()),
filter(isValueDefined),
finalize(() => shutdown())
);
notifications$.subscribe(noti => foo(noti));
notifications$.subscribe(noti => bar(noti));
然后我们得到了这个变体:
let isStartedUp = false;
const internalStartup = () => {
if(!isStartedUp){
isStartedUp = true;
startup();
}
}
const notifications$ = notificationSubject.pipe(
tap(() => internalStartup()),
filter(isValueDefined),
finalize(() => shutdown())
);
notifications$.subscribe(noti => foo(noti));
notifications$.subscribe(noti => bar(noti));
...这是它的工作,但它做得有点太好了,因为现在启动只进行一次(并且只在第一次订阅时)而不是每次创建订阅一次。
我想应该有类似的东西,但我还没找到。
const notifications$ = notificationSubject.pipe(
initialize(() => startup()),
finalize(() => shutdown())
);
您可以使用 defer
在每次订阅时执行一些代码。
export function initialize<T>(initializer: () => void): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => defer(() => {
initializer();
return source;
});
}
const notifications$ = notificationSubject.pipe(
initialize(() => startup()),
finalize(() => shutdown())
);