如何序列化可观察数组的执行

How to serialize execution of array of observables

我有一个验证过程,可以逐行验证 table 中的数据。因为每个行验证都使用一个共享资源,所以对它的访问必须序列化。

public validate():Observable<boolean>{
    const rowValidations:Observable<boolean>[] = dataRows.map(row=>this.validateSingleRow(row);
    return forkJoin(...rowValidations).pipe(
      map(results=>results.every(r=>r))
    )
}

如果我理解正确,forkJoin 不会像 concat 那样等待每个 observable 完成后再订阅下一个,所以这可能会失败。 concat 另一方面,将所有可观察对象序列化为单个流。

我怎样才能像 concat 那样获得订阅订单,但是像 forkJoin 一样拥有每个可观察对象的结果数组,有效地同步每个内部可观察对象的执行(如 Javas synchronzied validateSingleRow )?

这样的事情对你有用吗?

class SomeClass {
  dataRows = [1, 2, 3];

  public validate(): Observable<boolean[]> {
    return this.resolveSequentially(this.dataRows);
  }

  private validateSequentially<T>([cur, ...obs]: T[]): Observable<boolean[]> {
    return cur
      ? this.validateSingleRow(cur).pipe(
          switchMap((x) =>
            this.validateSequentially(obs).pipe(map((arr) => [x, ...arr]))
          )
        )
      : of([]);
  }

  // Mock
  private validateSingleRow(cur: any) {
    console.log(`Validating ${cur}...`);
    return of(Math.floor(Math.random() * 2) === 1).pipe(
      delay(1000),
      tap((x) => console.log(`Result: ${x}`))
    );
  }
}

const obj = new SomeClass();

obj.validate().subscribe(console.log);

StackBlitz demo

实际上,如果你知道每个 this.validateSingleRow(row) 总是只发射一次你可以使用 toArray():

concat(...rowValidations).pipe(
  toArray(),
);

concat 将保证正确的顺序,toArray() 会将所有发射收集到一个数组中,并在源 Observable 完成后重新发射。

否则,如果 validateSingleRow 可能会发出多次,而你总是只想要它的最后一个值,你可以使用 scan:

const indexedRowValidations = rowValidations.map((o, index) => o.pipe(
  map(result => [index, result]),
));

concat(...indexedRowValidations ).pipe(
  scan((acc, [index, result]) => {
    acc[index] = result;
    return acc;
  }, {}),
  takeLast(1),
);

(我没有测试,但我相信你明白了:))。

满足我要求的解决方案比人们想象的要简单。我用 concattoArray() 像这样

const rowValidations:Observable<boolean>[] = dataRows.map(row=>defer(()=>this.validateSingleRow(row));
return concat(...rowValidations).pipe(
  toArray(),
  map(results=>results.every(r=>r))
)

所以validateSingleRow一个一个执行,toArray将boolean流转化为boolean数组