RxJs forkJoin 连续几个观察值
RxJs forkJoin several observables in a row
更新:
我有一个无法解决的问题。
在我的代码中,我有一个对象列表。
对于每个对象,我必须连续链接 3 个请求,而不是同时链接。但是每个对象都可以并行完成。
所以我使用 forkjoin
在查询完成后执行代码。
但是循环执行所有过程而忽略错误。
所以我更改了下面的代码和 3 个程序 运行 相继。
但是,如果出现错误,则不会考虑 catchError。
因此,代码连接过程 1、2 和 3,即使 1 出错。
procedure3 this.store.proc3 (input3, "ERR") 即使发生错误也必须执行。
this.list.forEach(obj => {
var input1 = obj...;
var input2 = obj...;
var input3 = obj...;
var obs = this.store.input1(input1).pipe(
catchError(err1 => this.store.proc3(input3, "ERR")),
concatMap(res1 => this.store.proc2(input2).pipe(
catchError(err2 => this.store.proc3(input3, "ERR"),
concatMap(res2 => this.store.proc3(input3, "OK")
))
);
_obs$.push(obs);
}
forkJoin(_obs$).subscribe(
results => {
if(results) {
this._dialogRef.close(true);
// ...
}
}
);
CURRENT CASE :
- proc1 OK -> proc2 OK -> proc3(OK)
- proc1 OK -> proc2 ERR -> proc3(OK)
- proc1 ERR -> proc2 ERR -> proc3(OK)
DESIRED CASE :
- proc1 OK -> proc2 OK -> proc3(OK)
- proc1 OK -> proc2 ERR -> proc3(ERR)
- proc1 ERR -> proc3(ERR)
INFOS :
- proc1 -> return true OR exeption
- proc2 -> return true OR exeption
- proc3 -> return object (allows you to change the status of the object)
谁有解决办法,我对RxJs不熟悉
目前问题不明确。如果您希望对数组中的每个元素进行 3 次顺序调用,您可以使用 concatMap
s 对它们中的每一个进行管道调用。如果你说这 3 个对所有元素的调用可以并行完成,那么它可以全部包含在 forkJoin
.
中
不过,我还不确定您对订阅的期望值是多少。以下代码将为每个元素提供来自 procedure3
的响应数组。
forkJoin(
this.list.map(obj => // JS `Array#map`
this.store.procedure1(obj).pipe( // 1st call
concatMap(_ =>
this.store.procedure2(obj).pipe( // 2nd call
concatMap(result =>
iif(
() => !!result, // conditional 3rd call using RxJS `iif`
this.store.procedure3(this.var),
this.store.procedure3(this.var) // <-- same calls for both conditions?
)
)
)
)
)
)
).subscribe(
...
);
更新:包括catchError
你快到了。在 RxJS 中,操作符的顺序非常重要。在您的情况下,由于 catchError
在 concatMap
之上,并且由于它将错误转换为 next
发射,因此它会触发以下 concatMap
。解决方案是将 catchError
块移动到 concatMap
下方。
尝试以下方法
this.list.forEach(obj => {
var input1 = obj...;
var input2 = obj...;
var input3 = obj...;
var obs = this.store.proc1(input1).pipe(
concatMap(res1 => this.store.proc2(input2).pipe(
concatMap(res2 => this.store.proc3(input3, "OK")),
catchError(err2 => this.store.proc3(input3, "ERR"))
)),
catchError(err1 => this.store.proc3(input3, "ERR")),
);
_obs$.push(obs);
}
更新:
我有一个无法解决的问题。
在我的代码中,我有一个对象列表。 对于每个对象,我必须连续链接 3 个请求,而不是同时链接。但是每个对象都可以并行完成。
所以我使用 forkjoin
在查询完成后执行代码。
但是循环执行所有过程而忽略错误。
所以我更改了下面的代码和 3 个程序 运行 相继。
但是,如果出现错误,则不会考虑 catchError。 因此,代码连接过程 1、2 和 3,即使 1 出错。
procedure3 this.store.proc3 (input3, "ERR") 即使发生错误也必须执行。
this.list.forEach(obj => {
var input1 = obj...;
var input2 = obj...;
var input3 = obj...;
var obs = this.store.input1(input1).pipe(
catchError(err1 => this.store.proc3(input3, "ERR")),
concatMap(res1 => this.store.proc2(input2).pipe(
catchError(err2 => this.store.proc3(input3, "ERR"),
concatMap(res2 => this.store.proc3(input3, "OK")
))
);
_obs$.push(obs);
}
forkJoin(_obs$).subscribe(
results => {
if(results) {
this._dialogRef.close(true);
// ...
}
}
);
CURRENT CASE :
- proc1 OK -> proc2 OK -> proc3(OK)
- proc1 OK -> proc2 ERR -> proc3(OK)
- proc1 ERR -> proc2 ERR -> proc3(OK)
DESIRED CASE :
- proc1 OK -> proc2 OK -> proc3(OK)
- proc1 OK -> proc2 ERR -> proc3(ERR)
- proc1 ERR -> proc3(ERR)
INFOS :
- proc1 -> return true OR exeption
- proc2 -> return true OR exeption
- proc3 -> return object (allows you to change the status of the object)
谁有解决办法,我对RxJs不熟悉
目前问题不明确。如果您希望对数组中的每个元素进行 3 次顺序调用,您可以使用 concatMap
s 对它们中的每一个进行管道调用。如果你说这 3 个对所有元素的调用可以并行完成,那么它可以全部包含在 forkJoin
.
不过,我还不确定您对订阅的期望值是多少。以下代码将为每个元素提供来自 procedure3
的响应数组。
forkJoin(
this.list.map(obj => // JS `Array#map`
this.store.procedure1(obj).pipe( // 1st call
concatMap(_ =>
this.store.procedure2(obj).pipe( // 2nd call
concatMap(result =>
iif(
() => !!result, // conditional 3rd call using RxJS `iif`
this.store.procedure3(this.var),
this.store.procedure3(this.var) // <-- same calls for both conditions?
)
)
)
)
)
)
).subscribe(
...
);
更新:包括catchError
你快到了。在 RxJS 中,操作符的顺序非常重要。在您的情况下,由于 catchError
在 concatMap
之上,并且由于它将错误转换为 next
发射,因此它会触发以下 concatMap
。解决方案是将 catchError
块移动到 concatMap
下方。
尝试以下方法
this.list.forEach(obj => {
var input1 = obj...;
var input2 = obj...;
var input3 = obj...;
var obs = this.store.proc1(input1).pipe(
concatMap(res1 => this.store.proc2(input2).pipe(
concatMap(res2 => this.store.proc3(input3, "OK")),
catchError(err2 => this.store.proc3(input3, "ERR"))
)),
catchError(err1 => this.store.proc3(input3, "ERR")),
);
_obs$.push(obs);
}