合并来自可观察对象的数组,而无需等待所有数组完成

Combine arrays from observables without waiting for all to complete

在我的应用程序中,我以一定的时间间隔从远程服务器检索数据。数据被格式化为对象数组,并通过 angular 异步管道显示在 HTML 模板中。
我正在使用 shareReplay() 来提供缓存功能:

  private refresherToggleSales$: Subject<void>;
  private displayLastSales$: Observable<Sale[]>;

  public InjectSale$: Subject<Sale[]>;
  
  [...]

   get LastSales() : Observable<Sale[]> {
    if (!this.displayLastSales$) {
      const rTimer$ = timer(0, DefaultRefresh);

      this.displayLastSales$ = rTimer$.pipe(
        switchMap(_ => this.getLastSales()),
        shareReplay(this.CACHE_SIZE),
        takeUntil(this.refresherToggleSales$)
      );
    }

    return merge(this.displayLastSales$, this.InjectSale$);
  }

  private getLastSales() : Observable<Sale[]> {
    // get data from server
  }

每当在应用程序中手动输入销售时,我希望它立即显示,而不必等待它来自服务器。

但是,如果我用 1 元素数组调用 InjectLastSale$.next(),它将完全替换所有以前的数据。
如何合并这两个流(无需等待另一个流完成)?

谢谢

您可以使用 combineLatest 运算符

combineLatest(this.displayLastSales$, this.InjectSale$).subscribe(
(displayLastSales, InjectSale) => {
        console.log(displayLastSales, InjectSale )
      })

现在,无论何时发出数据,您都会立即看到它,而无需等待其他可观察对象发出数据。

你很接近!只需更改放置 merge.

的位置即可
get LastSales() : Observable<Sale[]> {
    
  if (!this.displayLastSales$) {
    this.displayLastSales$ = timer(0, DefaultRefresh).pipe(
      switchMap(_ => this.getLastSales().pipe(
        mergeWith(this.InjectSale$),
        scan((acc, vals) => [...vals, ...acc], <Sale[]>[])
      )),
      shareReplay(this.CACHE_SIZE),
      takeUntil(this.refresherToggleSales$)
    );
  }

  return this.displayLastSales$;
}