如何序列化可观察数组的执行
How to serialize execution of array of observables
我有一个验证过程,可以逐行验证 table 中的数据。因为每个行验证都使用一个共享资源,所以对它的访问必须序列化。
public validate():Observable<boolean>{
const rowValidations:Observable<boolean>[] = dataRows.map(row=>this.validateSingleRow(row);
return forkJoin(...rowValidations).pipe(
map(results=>results.every(r=>r))
)
}
如果我理解正确,forkJoin
不会像 concat
那样等待每个 observable 完成后再订阅下一个,所以这可能会失败。 concat
另一方面,将所有可观察对象序列化为单个流。
我怎样才能像 concat
那样获得订阅订单,但是像 forkJoin
一样拥有每个可观察对象的结果数组,有效地同步每个内部可观察对象的执行(如 Javas synchronzied validateSingleRow
)?
这样的事情对你有用吗?
class SomeClass {
dataRows = [1, 2, 3];
public validate(): Observable<boolean[]> {
return this.resolveSequentially(this.dataRows);
}
private validateSequentially<T>([cur, ...obs]: T[]): Observable<boolean[]> {
return cur
? this.validateSingleRow(cur).pipe(
switchMap((x) =>
this.validateSequentially(obs).pipe(map((arr) => [x, ...arr]))
)
)
: of([]);
}
// Mock
private validateSingleRow(cur: any) {
console.log(`Validating ${cur}...`);
return of(Math.floor(Math.random() * 2) === 1).pipe(
delay(1000),
tap((x) => console.log(`Result: ${x}`))
);
}
}
const obj = new SomeClass();
obj.validate().subscribe(console.log);
实际上,如果你知道每个 this.validateSingleRow(row)
总是只发射一次你可以使用 toArray()
:
concat(...rowValidations).pipe(
toArray(),
);
concat
将保证正确的顺序,toArray()
会将所有发射收集到一个数组中,并在源 Observable 完成后重新发射。
否则,如果 validateSingleRow
可能会发出多次,而你总是只想要它的最后一个值,你可以使用 scan
:
const indexedRowValidations = rowValidations.map((o, index) => o.pipe(
map(result => [index, result]),
));
concat(...indexedRowValidations ).pipe(
scan((acc, [index, result]) => {
acc[index] = result;
return acc;
}, {}),
takeLast(1),
);
(我没有测试,但我相信你明白了:))。
满足我要求的解决方案比人们想象的要简单。我用 concat
和 toArray()
像这样
const rowValidations:Observable<boolean>[] = dataRows.map(row=>defer(()=>this.validateSingleRow(row));
return concat(...rowValidations).pipe(
toArray(),
map(results=>results.every(r=>r))
)
所以validateSingleRow
一个一个执行,toArray
将boolean流转化为boolean数组
我有一个验证过程,可以逐行验证 table 中的数据。因为每个行验证都使用一个共享资源,所以对它的访问必须序列化。
public validate():Observable<boolean>{
const rowValidations:Observable<boolean>[] = dataRows.map(row=>this.validateSingleRow(row);
return forkJoin(...rowValidations).pipe(
map(results=>results.every(r=>r))
)
}
如果我理解正确,forkJoin
不会像 concat
那样等待每个 observable 完成后再订阅下一个,所以这可能会失败。 concat
另一方面,将所有可观察对象序列化为单个流。
我怎样才能像 concat
那样获得订阅订单,但是像 forkJoin
一样拥有每个可观察对象的结果数组,有效地同步每个内部可观察对象的执行(如 Javas synchronzied validateSingleRow
)?
这样的事情对你有用吗?
class SomeClass {
dataRows = [1, 2, 3];
public validate(): Observable<boolean[]> {
return this.resolveSequentially(this.dataRows);
}
private validateSequentially<T>([cur, ...obs]: T[]): Observable<boolean[]> {
return cur
? this.validateSingleRow(cur).pipe(
switchMap((x) =>
this.validateSequentially(obs).pipe(map((arr) => [x, ...arr]))
)
)
: of([]);
}
// Mock
private validateSingleRow(cur: any) {
console.log(`Validating ${cur}...`);
return of(Math.floor(Math.random() * 2) === 1).pipe(
delay(1000),
tap((x) => console.log(`Result: ${x}`))
);
}
}
const obj = new SomeClass();
obj.validate().subscribe(console.log);
实际上,如果你知道每个 this.validateSingleRow(row)
总是只发射一次你可以使用 toArray()
:
concat(...rowValidations).pipe(
toArray(),
);
concat
将保证正确的顺序,toArray()
会将所有发射收集到一个数组中,并在源 Observable 完成后重新发射。
否则,如果 validateSingleRow
可能会发出多次,而你总是只想要它的最后一个值,你可以使用 scan
:
const indexedRowValidations = rowValidations.map((o, index) => o.pipe(
map(result => [index, result]),
));
concat(...indexedRowValidations ).pipe(
scan((acc, [index, result]) => {
acc[index] = result;
return acc;
}, {}),
takeLast(1),
);
(我没有测试,但我相信你明白了:))。
满足我要求的解决方案比人们想象的要简单。我用 concat
和 toArray()
像这样
const rowValidations:Observable<boolean>[] = dataRows.map(row=>defer(()=>this.validateSingleRow(row));
return concat(...rowValidations).pipe(
toArray(),
map(results=>results.every(r=>r))
)
所以validateSingleRow
一个一个执行,toArray
将boolean流转化为boolean数组