Angular 处理多个依赖订阅

Angular handle multiple dependent subscriptions

有什么帮助吗?

let notificationsMessages = []
countries.forEach((country: any) => {
    this.isActiveCountry(country.isActive).subscribe((data) => { // // CALL #1 TO API
        country.serverId = data.serverId;
        this.uploadPhotoToApi(country.fileSource).subscribe((response) => { // CALL #2 TO API
          // server return the uploaded file ID
          country.serverFileID = response.serverFileId;
          this.sendCountryToApi(country).subscribe((response) => { // CALL #3 TO API
            this.countriesTable.delete(country.id).then(() => {
              // Delete the uploaded country from local database
              // if is the last country EMIT EVENT
            }, (error) => {
              // if is the last country EMIT EVENT
              notificationsMessages.push(error); // Error to delete country from indexedDB 
            });
          }, (error) => {
              // if is the last country EMIT EVENT
              notificationsMessages.push(error); // // Error to upload country to API
          });
        }, (errorCode) => {
              // if is the last country EMIT EVENT
          notificationsMessages.push(error); // Error on sending file to API
        });
      }, (error) => {
              // if is the last country EMIT EVENT
            notificationsMessages.push(error); // // Error on country identification
      });
  });
  

如何在处理完所有 country 列表后发出事件? 我需要知道有多少国家上传成功,有多少国家上传失败。

例如,如果我有一个包含 50 个国家/地区的列表,当处理完最后一个国家/地区时,我想发出一个包含 2 个数组的事件...如下所示: 成功:[countryId1, countryId2...] 错误:['Country Id 2 failed on upload'、'Country Id 10 failed on file upload']

所有这 3 个调用都是相关的,必须按顺序执行...我无法更改此流程。 我是否应该在 CALL #3 成功以及所有错误函数上发出事件? 谢谢!

尽量避免将许多 .subscribe( 嵌套在彼此内部的诱惑。正如@praveen-soni 提到的,switchMap 可以帮助解决这个问题。

然后要在处理完所有国家/地区后获取状态,我认为 forkJoin 非常适合:它接收一个可观察对象列表,并在所有这些都完成后发出。

如何建立观察列表?您最初有一个国家列表,因此您可以将每个国家映射到处理该国家的可观察对象。我们还可以使用一个 catchError,这样错误就不会关闭整个流,而只会关闭那个特定国家/地区的流。

我认为它看起来像:

const result$ = forkJoin(
  countries.map((country) =>
    this.isActiveCountry(country.isActive).pipe(
      switchMap((data) => {
        country.serverId = data.serverId;
        return this.uploadPhotoToApi(country.fileSource);
      }),
      switchMap((response) => {
        country.serverFileId = response.serverFileId;
        return this.sendCountryToApi(country);
      }),
      switchMap(() => {
        return this.countriesTable.delete(country.id);
      }),
      map(() => {
        // Everything went OK, map it to an OK status
        return {
          type: "success",
        };
      }),
      catchError((error) => {
        // If any step fails, map it to an error type
        return of({
          type: "error",
          error,
        });
      }),
      take(1) // Make sure the observable completes
    )
  )
);

// result$ can now be used as any other observable
result$.subscribe(result => {
  console.log(result);
})

这是一种方法。这可能有点矫枉过正,因为它为您提供了对错误处理的许多精细控制,但基本上总是以相同的方式处理错误。

即便如此,这可能比最直接的解决方案更容易扩展。

这里:

interface TaggedCountry{
  success: boolean,
  country: any,
  error?: any
}

class ArbiratryClassName {

  processCountry(country: any): Observable<TaggedCountry>{

    return this.isActiveCountry(country.isActive).pipe(
      // country now has serverId set
      map(({serverId}) => ({...country, serverId})),
      catchError(error => throwError(() => ({
        success: false,
        country,
        error
      }) as TaggedCountry)),

      mergeMap((resultCountry: any) => this.uploadPhotoToApi(resultCountry.fileSource).pipe(
        // country now has serverFileId set
        map(({serverFileId}) => ({...resultCountry, serverFileId})),
        catchError(error => throwError(() => ({
          success: false,
          country: resultCountry,
          error
        }) as TaggedCountry))
      )),

      mergeMap((resultCountry: any) => this.sendCountryToApi(resultCountry).pipe(
        // Ignore response from sendCountryToApi
        mapTo(resultCountry),
        catchError(error => throwError(() => ({
          success: false,
          country: resultCountry,
          error
        }) as TaggedCountry))
      )),

      mergeMap((resultCountry: any) => from(this.countriesTable.delete(resultCountry.id)).pipe(
        // Ignore response from countriesTable.delete
        mapTo(resultCountry),
        catchError(error => throwError(() => ({
          success: false,
          country: resultCountry,
          error
        }) as TaggedCountry))
      )),

      map((resultCountry: any) => ({
        success: true,
        country: resultCountry
      }) as TaggedCountry),

      // Convert errors into regular emissions
      catchError((tagged:TaggedCountry) => of(tagged))
    );
  }

  processCountries(countries: any[]): Observable<{success: TaggedCountry[], errors: TaggedCountry[]}>{
    return forkJoin(countries.map(c => this.processCountry(c))).pipe(
      map((tagged: TaggedCountry[]) => ({
        success: tagged.filter(tag => tag.success),
        errors: tagged.filter(tag => !tag.success)
      }))
    )
  }

  doSomethingWith(countries: any[]): void {
    this.processCountries(countries).subscribe({
      next: countries => console.log("All countries processed. Result: ", countries),
      complete: () => console.log("There's only one emission, so this should get called immediately after .next() was called"),
      error: err => console.log("This is a surprise, is there an error we didn't catch earlier? Error: ", err)
    })
  }
}

如果看到相同的事情以不同的方式完成会有所帮助,这里是 processCountry

的较短实现
processCountry(country: any): Observable<TaggedCountry>{

  return this.isActiveCountry(country.isActive).pipe(
    tap((res:any) => country.serverId = res.serverId),

    switchMap(_ => this.uploadPhotoToApi(country.fileSource)),
    tap((res:any) => country.serverFileId = res.serverFileId),

    switchMap(_ => this.sendCountryToApi(country)),
    switchMap(_ => this.countriesTable.delete(country.id)),

    // Tag our result as a success
    map(_ => ({
      success: true,
      country
    }) as TaggedCountry),

    // Tag our errors and convert errors into regular emissions
    catchError(error => of(({
      success: false,
      country,
      error
    }) as TaggedCountry))
  );
}