如何从其中一个未发出的所有 rxjs 流中获取数据?

How to get data from all rxjs streams where one of them are not emitted?

我有一些指示更改的 Rxjs 流:

reportChanges$
objectPropChanges$
textPropsChanges$

前两个有默认值,最新的没有。

我尝试收集所有更改并存储它们:

   this.changes$ = combineLatest([
            this.reportChanges$,
            this.objectPropChanges$,
            this.textPropsChanges$,
        ]).pipe(
            debounceTime(this.debouncetime),
            map(([reportProperties, reportObjectsProperties, textProperties]) => {
                return { reportObjectsProperties, reportProperties, textProperties };
            }),
        );

然后存储它:

   this.changes$
            .pipe(
                skip(1),
                switchMap((properties: ReportPropertiesRequest) => this.save(this.registryId, properties)),
            )
            .subscribe();
    }

问题是 combineLatest 仅在所有流都具有最新值时才起作用。在我的例子中 textPropsChanges$ 没有。

如何收集所有更改并存储它们?

我的目的是收集异步发生的所有更改并将它们立即存储在存储器中。

可能的解决方案是使用 startWith 或使用 BehSubject 为所有 stremas 设置起始值。但就我而言,我不能使用它,因为用户应该自己制作 chnages。

您可以忽略一些默认标记。所以这里我用 null 作为例子:

this.changes$ = combineLatest([
  this.reportChanges$,
  this.objectPropChanges$,
  this.textPropsChanges$.pipe(startWith(null)),
]).pipe(
  debounceTime(this.debouncetime),
  map(([reportProperties, reportObjectsProperties, textProperties]) => 
      textProperties != null ? // Only include if not null
      { reportObjectsProperties, reportProperties, textProperties } : 
      { reportObjectsProperties, reportProperties }
  ),
);

可能您可以拥有本地对象 changes = {}。然后分别监听每个流并设置chnages到这个对象:

this.reportChanges$.subscribe((data) => this.changes.report = data;
this.objectPropChanges$.subscribe((data) => this.changes.objectProp = data;