RxJs forkJoin 连续几个观察值

RxJs forkJoin several observables in a row

更新:

我有一个无法解决的问题。

在我的代码中,我有一个对象列表。 对于每个对象,我必须连续链接 3 个请求,而不是同时链接。但是每个对象都可以并行完成。

所以我使用 forkjoin 在查询完成后执行代码。 但是循环执行所有过程而忽略错误。


所以我更改了下面的代码和 3 个程序 运行 相继。

但是,如果出现错误,则不会考虑 catchError。 因此,代码连接过程 1、2 和 3,即使 1 出错。

procedure3 this.store.proc3 (input3, "ERR") 即使发生错误也必须执行。

this.list.forEach(obj => {
    var input1 = obj...;
    var input2 = obj...;
    var input3 = obj...;
    
    var obs = this.store.input1(input1).pipe(
        catchError(err1 => this.store.proc3(input3, "ERR")),
        concatMap(res1 => this.store.proc2(input2).pipe(
            catchError(err2 => this.store.proc3(input3, "ERR"),
            concatMap(res2 => this.store.proc3(input3, "OK")
        ))
    );
    _obs$.push(obs);
}

forkJoin(_obs$).subscribe(
    results => {
        if(results) {
            this._dialogRef.close(true);
            // ...
        }
    }
);

CURRENT CASE :

  • proc1 OK -> proc2 OK -> proc3(OK)
  • proc1 OK -> proc2 ERR -> proc3(OK)
  • proc1 ERR -> proc2 ERR -> proc3(OK)

DESIRED CASE :

  • proc1 OK -> proc2 OK -> proc3(OK)
  • proc1 OK -> proc2 ERR -> proc3(ERR)
  • proc1 ERR -> proc3(ERR)

INFOS :

  • proc1 -> return true OR exeption
  • proc2 -> return true OR exeption
  • proc3 -> return object (allows you to change the status of the object)

谁有解决办法,我对RxJs不熟悉

目前问题不明确。如果您希望对数组中的每个元素进行 3 次顺序调用,您可以使用 concatMaps 对它们中的每一个进行管道调用。如果你说这 3 个对所有元素的调用可以并行完成,那么它可以全部包含在 forkJoin.

不过,我还不确定您对订阅的期望值是多少。以下代码将为每个元素提供来自 procedure3 的响应数组。

forkJoin(
  this.list.map(obj =>                           // JS `Array#map`
    this.store.procedure1(obj).pipe(             // 1st call
      concatMap(_ =>
        this.store.procedure2(obj).pipe(         // 2nd call
          concatMap(result =>
            iif(
              () => !!result,                    // conditional 3rd call using RxJS `iif`
              this.store.procedure3(this.var),
              this.store.procedure3(this.var)    // <-- same calls for both conditions?
            )
          )
        )
      )
    )
  )
).subscribe(
  ...
);

更新:包括catchError

你快到了。在 RxJS 中,操作符的顺序非常重要。在您的情况下,由于 catchErrorconcatMap 之上,并且由于它将错误转换为 next 发射,因此它会触发以下 concatMap。解决方案是将 catchError 块移动到 concatMap 下方。

尝试以下方法

this.list.forEach(obj => {
    var input1 = obj...;
    var input2 = obj...;
    var input3 = obj...;
    
    var obs = this.store.proc1(input1).pipe(
        concatMap(res1 => this.store.proc2(input2).pipe(
            concatMap(res2 => this.store.proc3(input3, "OK")),
            catchError(err2 => this.store.proc3(input3, "ERR"))
        )),
        catchError(err1 => this.store.proc3(input3, "ERR")),
    );
    _obs$.push(obs);
}