当有人订阅 observable B 时订阅 observable A

Subscribe to observable A when someone subscribe to observable B

我有一个 API 调用是可观察的。 我还有一个可观察的 B 来跟踪该查询的进度:

  generateUrl$(uploadConfig: GenerateUploadUrlVariables, file: File) {
    const generation$ = new BehaviorSubject<ActionWithResult<{ url: string }>>({
      state: 'INIT',
      result: null,
    });

    const complete = function (result: ActionWithResult<{ url: string }>) {
      generation$.next(result);
      generation$.complete();
    };

    this.generateUploadUrl(uploadConfig).pipe(
      switchMap((result) => {
        generation$.next({ state: 'UPLOADING', result: null });
        const url = result.data.generateUploadUrl.url || '';
        return this.httpClient
          .pipe(
            tap(() => {
              complete({ state: 'SUCCEEDED', result: { url } });
            }),
            catchError((e) => {
              complete({ state: 'FAILED', result: null });
              return throwError(() => e);
            })
          );
      }),
      catchError((e) => {
        complete({ state: ActionState.FAILED, result: null });
        return throwError(() => e);
      })
    ).subscribe();

    return generation$
  }

现在,我可以了

this.generateUrl$(option, file).subscribe(e=> {console.log(e)})

我会得到查询的状态,一旦成功就会得到结果。

但问题是,如果有人犯错并简单地做:

this.generateUrl$()没有订阅,不会跟踪结果,但API仍然会被调用。

我想将 api observable this.generateUploadUrl 与跟踪 observable generation$

绑定

类似

return generation$.pipe(
    whenSomeoneSubscribeToIt(() => {
       this.generateUploadUrl(uploadConfig)
       .pipe(//all the stuff above)
       .subscribe()
    })
)

可能吗?

这可能是在这种情况下不使用像 BehaviorSubject 这样的多播的原因之一。使用 new Observable() 函数创建一个可观察对象更适合您。

请注意,我还调整了一些其他实现细节,例如使用 tap 运算符的 finalizeerror 回调而不是内部 catchError 运算符。

尝试以下方法

import { Observable, Observer, throwError } from 'rxjs';
import { catchError, finalize, switchMap, tap } from 'rxjs/operators';

generateUrl$(uploadConfig: GenerateUploadUrlVariables, file: File): Observable<any> {
  return new Observable((generation$: Observer) => {
    generation$.next({ state: 'INIT', result: null });
    this.generateUploadUrl(uploadConfig).pipe(
      switchMap((result: any) => {
        generation$.next({ state: 'UPLOADING', result: null });
        const url = result.data.generateUploadUrl.url || '';
        return this.httpClient.get(someUrl).pipe(
          tap({
            next: (res: any) => generation$.next({ state: 'SUCCEEDED', result: { url } }),
            error: (error: any) => generation$.next({ state: 'FAILED', result: null })
          }),
          finalize(() => generation$.complete())
        );
      }),
      catchError((error: any) => {
        generation$.next({ state: ActionState.FAILED, result: null });
        generation$.complete();
        return throwError(() => error);
      })
    ).subscribe();
  });
}

天真的解决方案

我创建了一个自定义运算符,它可以 运行 在订阅流之后但在接收到它的第一次发射之前产生效果。我将其命名为 initialize,但它几乎就是您用 whenSomeoneSubscribeToIt 描述的内容。

这里是:

export function initialize<T>(
  effect: () => void
): MonoTypeOperatorFunction<T> {

  return s => new Observable(ob => {
    const sub = s.subscribe(ob);
    effect();
    return sub;
  });

}

现在您可以让您的订阅生效:

generateUrl$(
  uploadConfig: GenerateUploadUrlVariables, 
  file: File
) {

  // Some stuff here

  return generation$.pipe(
    initialize(() => {
      this.generateUploadUrl(uploadConfig).pipe(
        // some more stuff here
      ).subscribe();
    })
  );

}

地道的 RxJS

您根本不需要上面的 BehaviourSubject。您基本上是在使用它来强制生成流。

正如另一个答案所指出的,您可以直接为观察者创建相同的流。这种方法要好一些,但仍然会遇到使用行为主体会遇到的许多问题。例如,该答案创建了一个不保证完成的可观察对象,并且在您取消订阅时不管理它的内部订阅。

我想说惯用的 RxJS 方法是在使用 RxJS 运算符后以声明方式创建流。

generateUrl$(
  uploadConfig: GenerateUploadUrlVariables, 
  file: File
): Observable<StateWithResult> {

  return this.generateUploadUrl(uploadConfig).pipe(
    map(result => result.data.generateUploadUrl.url || ''),
    switchMap(url => this.httpClient.pipe(
      ignoreElements(),
      startWith({ state: 'UPLOADING', result: null }),
      concatWith(of({ state: 'SUCCEEDED', result: { url } }))
    )),
    catchError(_ => of({ state: 'FAILED', result: null })),
    startWith({ state: 'INIT', result: null })
  );

}

我在这里所做的一个更改是我不会重新抛出您的错误。看起来你并没有在下游使用它们,所以这是一种更干净的处理它们的方法。您可以(当然!)回去重新抛出它们。

即使消费者没有调用 .subscribe(),http 调用也会触发的原因是因为您在函数内部的 generateUploadUrl 上调用 .subscribe()

你可以得到你想要的行为(我想说的是正确的反应行为)通过简单地返回由 generateUploadUrl 返回的可观察对象并管道它的结果到您想要的值。我们可以使用 startWith 发出您的初始值:

  generateUrl$(uploadConfig: GenerateUploadUrlVariables, file: File): Observable<ActionWithResult> {

    return this.generateUploadUrl(uploadConfig).pipe(
      map(result => result.data.generateUploadUrl.url || ''),
      switchMap(url => this.httpClient.pipe(
        map(() => ({ state: ActionState.Succeeded, result: { url } })),
        startWith({ state: ActionState.Uploading, result: null })
      )),
      catchError(() => of({ state: ActionState.Failed, result: null })),
      startWith({ state: ActionState.Init, result: null })
    );
  }

这是一个有效的 StackBlitz 演示。

请注意,您没有在函数内部进行订阅,这首先导致了您的不良行为。

如果您需要多播行为(如果您有多个订阅者),您只需在管道的末尾添加一个 shareReplay