RxJs 6:管理数组迭代和延迟
RxJs 6 : Manage array iteration and delay
我需要有关管理数组迭代延迟的帮助。
关于我的https://jsfiddle.net/mlefree/vrL813j2/93/,两个问题:
- 如何为每个迭代操作添加延迟?
- 如何减少所有迭代计算值?
```
...
const arrayAsObservable = of(null).pipe(
delay(500),
switchMap(_ => getObjectWithArrayInPromise()),
map(val => {
log('array', val);
return (val.myArray);
}),
switchMap(val => from(val))
);
const eachElementAsObservable = arrayAsObservable.pipe(
delay(500), // Not working : we want to wait 500ms more for each value
map(val => {
log('value', val);
return val ;
}),
switchMap(val => getNewValueInPromise(val)),
map(val => {
// Not working : why not all new values ?
log('value after computing (KO)', val);
return (val);
})
);
const summarizeAsObservable = eachElementAsObservable.pipe(
// Not working : we want to sum all new values
map(val => {
log('value before reduce (KO)', val);
return val ;
}),
reduce((a,b) => a + b)
);
summarizeAsObservable.subscribe(msg => {
log('We did a total of (KO)', msg);
});
```
总的来说代码有点太复杂了,有几行成为你代码的根本问题。
你在计算 (KO) 后只收到一个值的原因是你使用了 switchMap
一旦源发出,它将取消订阅内部 observable,所以你总是得到最后一个发出的值。我还将延迟更改为 timer
和 mapTo
发射值
const eachElementAsObservable = arrayAsObservable.pipe(
concatMap(value => timer(1500).pipe(mapTo(value))), // Not working : we want to wait 500ms for each value
map(val => {
log('value', val);
return val;
}),
mergeMap(val => from(getNewValueInPromise(val))),
map(val => {
// Not working : why not all new values ?
console.log('value after computing (KO)', val);
return (val);
})
);
这是更新后的 fiddle https://jsfiddle.net/fancheung/vrL813j2/109/
代码没有完全正常工作,因为你通过 reject in promise 抛出一个错误,这将导致 observable 停止发射,你需要在流
的某个地方放置一个 catchError
我需要有关管理数组迭代延迟的帮助。
关于我的https://jsfiddle.net/mlefree/vrL813j2/93/,两个问题:
- 如何为每个迭代操作添加延迟?
- 如何减少所有迭代计算值?
```
...
const arrayAsObservable = of(null).pipe(
delay(500),
switchMap(_ => getObjectWithArrayInPromise()),
map(val => {
log('array', val);
return (val.myArray);
}),
switchMap(val => from(val))
);
const eachElementAsObservable = arrayAsObservable.pipe(
delay(500), // Not working : we want to wait 500ms more for each value
map(val => {
log('value', val);
return val ;
}),
switchMap(val => getNewValueInPromise(val)),
map(val => {
// Not working : why not all new values ?
log('value after computing (KO)', val);
return (val);
})
);
const summarizeAsObservable = eachElementAsObservable.pipe(
// Not working : we want to sum all new values
map(val => {
log('value before reduce (KO)', val);
return val ;
}),
reduce((a,b) => a + b)
);
summarizeAsObservable.subscribe(msg => {
log('We did a total of (KO)', msg);
});
```
总的来说代码有点太复杂了,有几行成为你代码的根本问题。
你在计算 (KO) 后只收到一个值的原因是你使用了 switchMap
一旦源发出,它将取消订阅内部 observable,所以你总是得到最后一个发出的值。我还将延迟更改为 timer
和 mapTo
发射值
const eachElementAsObservable = arrayAsObservable.pipe(
concatMap(value => timer(1500).pipe(mapTo(value))), // Not working : we want to wait 500ms for each value
map(val => {
log('value', val);
return val;
}),
mergeMap(val => from(getNewValueInPromise(val))),
map(val => {
// Not working : why not all new values ?
console.log('value after computing (KO)', val);
return (val);
})
);
这是更新后的 fiddle https://jsfiddle.net/fancheung/vrL813j2/109/ 代码没有完全正常工作,因为你通过 reject in promise 抛出一个错误,这将导致 observable 停止发射,你需要在流
的某个地方放置一个catchError