ForkJoin - For Loop - 在每次订阅后处理一些逻辑,在所有可观察对象解决后处理一些逻辑

ForkJoin - ForLoop - Handle some logic after each subscription and some after all obseravables resolved

我在 forloop 中有一个 http 调用。 API调用将 return 一个 HTML 响应,我正在使用它呈现在页面上。

现在我还有一个要求,在所有 API 调用完成后,我必须执行一些逻辑来更新后端的一些数据。

我知道我们可以使用 forkjoin 运算符来捕获可观察值数组,然后更新 BE 数据。但是我无法理解如何处理必须在每个订阅上完成的要求。

for(let item of Items){
    this.myService
            .getMyItemData(item.key)
            .pipe(
              takeUntil(this.destroyed),
              distinctUntilChanged(),
              catchError((e: Error) => {
                this.logger.logError('Error loading', e);
                return of('An Error Occurred');
              })
            ).subscribe((resp) => { 

//使用forkjoin时如何处理这个订阅??

             this.elementRef.nativeElement.html = resp;
    }) 
}
  

现在,在获取所有 itemData 之后,我想对后端执行更新。为此,我正在考虑使用 forkJoin 来捕获所有可观察数据。但同时我想使用订阅代码来呈现 HTML。有人可以帮助我如何实现这一目标。

我的 forkJoin 代码参考*

let arrayOfObservables  = Items.map((item) => this.myService
                .getMyItemData(item.key))

let dataSource =  Rx.Observable.forkJoin(arrayOfObservables);

dataSource.subscribe((resp) => {
  // update my BE data
})

forkJoin 只会在所有源可观察对象完成时发出。鉴于 distinctUntilChanged() 运算符的使用,我假设每个可观察对象都是通知流。

  1. 在那种情况下 combineLatestforkJoin 更合适。它会在任何源可观察对象发出时发出。但请注意:每个 observable 都应该至少发出一次才能触发订阅。如果您希望在某些可观察量尚未发出之前触发订阅,您可以将 startWith(null) 传递给每个可观察源。 另请查看 RxJS zip 函数。 是一个快速运行向下b/n不同的函数。

  2. 要在每次发射后执行某些操作,您可以使用 tap 运算符(如果执行副作用)或 map 运算符(转换数据)。

import { of, combineLatest, Subject } from 'rxjs';
import { tap, takeUntil, startWith, distinctUntilChanged, catchError } from 'rxjs/operators';

combineLatest(
  Items.map((item) => 
    this.myService.getMyItemData(item.key).pipe(
      startWith(null),           // <-- use conditionally (see above)
      distinctUntilChanged(),
      tap((resp) => {
        if (!!resp) {            // <-- avoid `null` from `startWith`
          this.elementRef.nativeElement.html = resp;
        }
      }),
      catchError((e: Error) => {
        this.logger.logError('Error loading', e);
        return of('An Error Occurred');
      })
    )
  )
).pipe(
  takeUntil(this.destroyed)
).subscribe((resp) => {
  // update my BE data
});

回答你的主要问题:

How to handle some logic after each subscription and some after all observables resolved

如果我没理解错的话,您想创建一个 dataSource 来表示来自多个不同调用的数据。您想在每个人调用 returns 时做某事(工作 #1),并在所有调用完成后做其他事情(工作 #2)。

要执行作业 #1,您可以使用 tap operator in the definition of your "individual" call. tap allows you to execute code whenever a value passes through the observable stream. If multiple values will be passing through and you only want to run logic when the observable completes, you can use finalize

forkJoin 创建由许多单独的调用组成的 dataSource,一旦它们全部完成,它将发出所有结果的数组。因此,您可以在订阅中轻松完成工作#2。但是,您也可以在 tapfinalize 中完成工作:

individual$ = key => this.myService.getMyItemData(key).pipe(
    tap(result => console.log(`call #${key} DONE`)) // <-- Do Work #1
);

dataSource$ = forkJoin(Items.map(i => individual$(i.key)).pipe(
    tap(allResults => console.log('all calls DONE')) // <-- Do Work #2
);

dataSource$.subscribe();

dataSource$ 的单个订阅将执行单个调用的逻辑(工作 #1)和“所有调用”的逻辑(工作 #2)。

在控制器中管理大量 elementRefs 并使用 this.elementRef.nativeElement.html 操纵 DOM 似乎会变得混乱。根据您的情况,使用 *ngFor:

绑定到 innerHTML 属性 可能更简单
<ul *ngIf="dataSource$ | async as items">
  <li *ngFor="let item of items" [innerHTML]="item"></li>
</ul>

请注意,如果您使用 async 管道,您甚至不需要在控制器中订阅。

这是一个 StackBlitz 示例。