如何在 rxjs 中对每个订阅进行一次可观察管道的初始化逻辑

How to do initialization logic on an observable pipe once per subscription in rxjs

所以我有这个可观察的管道,我需要在订阅开始时执行一次操作,就像您可以使用 finalize() 在订阅结束时执行一次操作一样

这就是我的开始,不幸的是,它会在每次针对该主题的 next() 调用中执行一次启动。

    const notificationSubject = new BehaviorSubject<Notification | undefined>(undefined);
    const notifications$ = this.notificationSubject.pipe(
      tap(() => startup()),
      filter(isValueDefined),
      finalize(() => shutdown())
    );

   notifications$.subscribe(noti => foo(noti));
   notifications$.subscribe(noti => bar(noti));

然后我们得到了这个变体:

    let isStartedUp = false;
    const internalStartup = () => {
      if(!isStartedUp){
        isStartedUp = true;
        startup();
      }
    }

    const notifications$ = notificationSubject.pipe(
      tap(() => internalStartup()),
      filter(isValueDefined),
      finalize(() => shutdown())
    );

   notifications$.subscribe(noti => foo(noti));
   notifications$.subscribe(noti => bar(noti));

...这是它的工作,但它做得有点太好了,因为现在启动只进行一次(并且只在第一次订阅时)而不是每次创建订阅一次。

我想应该有类似的东西,但我还没找到。

const notifications$ = notificationSubject.pipe(
      initialize(() => startup()),
      finalize(() => shutdown())
    );

您可以使用 defer 在每次订阅时执行一些代码。

export function initialize<T>(initializer: () => void): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => defer(() => {
    initializer();
    return source;
  });
}
const notifications$ = notificationSubject.pipe(
  initialize(() => startup()),
  finalize(() => shutdown())
);