RxJS 每 x 秒从数组中发出值,使用该值调用一个函数,如果失败则重试
RxJS emit values from array every x seconds, call a function with that value, retry if failed
我有一个数组,值的类型无关紧要。我想做的是每 x 秒发出一个值,用该值调用一个函数,如果该函数由于某种原因失败,则在 y 秒后重试(可以是一个简单的常量,这里不需要任何增量的东西) .
到目前为止我有什么
Rx.Observable
.interval(500)
.take(arr.length)
.map(idx => arr[idx])
.flatMap(dt => randomFunc(dt))
.catch(e => conosle.log(e))
.retry(5)
.subscribe();
function randomFunc(dt) {
return Rx.Observable.create(observer => {
if (dt === 'random') {
return observer.error(`error`);
} else {
return observer.next();
}
});
}
这里有 2 个问题:
1: 当 randomFunc
returns 出现错误时,整个链似乎重新开始。我只需要失败的重试。
2:catch
实际上从未记录任何错误,即使它似乎在出错时重试。
对于第一个问题,我尝试了 switchMap
而不是 flatMap
,如下所示:
Rx.Observable
.interval(500)
.take(arr.length)
.map(idx => arr[idx])
.switchMap(dt => randomFunc(dt)
.catch(e => conosle.log(e))
.retry(5)
)
.subscribe();
这样看来它只重试了失败的,但仍然没有记录任何错误,我什至不确定 switchMap
在这里是否合适(我真的是一个 Rx 菜鸟)。
任何帮助将不胜感激,谢谢!
When randomFunc
returns an error it seems that the whole chain starts
over. I only need the failed one to retry.`
在 RxJs 中,当将 Observables 组合在一起时,错误也会传播,未捕获的错误将导致取消订阅。
您在 switchMap
中使用 catch 的想法是正确的。虽然 switchMap
一次只会 flatten 一个 Observable,当下一个值被映射时,前一个 Observable 将被取消订阅(它是 switched出)
// Observable from array
Rx.Observable.from(arr)
.concatMap(value =>
// Put a 500 ms delay between each value
Rx.Observable.timer(500).map(_ => value)
)
.flatMap(dt =>
randomFunc(dt)
.retryWhen(errs =>
errs
.do(err => console.error(err))
// Retry at most 5 times
.take(5)
// Retry after 500ms
.delay(500)
)
)
.subscribe();
catch never actually logs any error, even though it seems to retry on
error.
传递给 catch 的函数应该 return 一个 Observable 例如:
Observable.throw(new Error())
.catch(e =>
(console.error(e), Observable.of('backup value'))
)
.subscribe();
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-catch
有几件事需要注意。 retry()
运算符只是重新订阅它的源,所以如果你不想再次开始整个迭代,你可以 merge/concat 异步函数进入链。
Rx.Observable.from(arr)
.concatMap(val => {
let attempts = 0;
return Rx.Observable.of(val)
.delay(500)
.concatMap(val => randomFunc(val)
.catch((err, caught) => {
console.log('log error');
if (attempts++ === 1) {
return Rx.Observable.of(err);
} else {
return caught;
}
})
);
})
.subscribe(val => console.log(val));
function randomFunc(dt) {
return Rx.Observable.create(observer => {
if (dt === 'random') {
observer.error(`error received ${dt}`);
} else {
observer.next(dt);
observer.complete();
}
});
}
观看现场演示:https://jsbin.com/qacamab/7/edit?js,console
这会打印到控制台:
1
2
3
4
log error
log error
error received random
6
7
8
9
10
catch()
运算符是最重要的部分。它的选择器函数有两个参数:
err
- 发生的错误
caught
- 原始的 Observable。
如果我们从选择器函数 return caught
我们将重新订阅源 Observable(与 retry(1)
相同)。由于您想记录每条错误消息,我们必须使用 catch()
而不是 retry()
。通过 returning Rx.Observable.of(err)
我们进一步传播错误,然后它会作为 next
通知被订阅者接收。我们也可以 return 只是 Observable.empty()
来简单地忽略错误。
我有一个数组,值的类型无关紧要。我想做的是每 x 秒发出一个值,用该值调用一个函数,如果该函数由于某种原因失败,则在 y 秒后重试(可以是一个简单的常量,这里不需要任何增量的东西) .
到目前为止我有什么
Rx.Observable
.interval(500)
.take(arr.length)
.map(idx => arr[idx])
.flatMap(dt => randomFunc(dt))
.catch(e => conosle.log(e))
.retry(5)
.subscribe();
function randomFunc(dt) {
return Rx.Observable.create(observer => {
if (dt === 'random') {
return observer.error(`error`);
} else {
return observer.next();
}
});
}
这里有 2 个问题:
1: 当 randomFunc
returns 出现错误时,整个链似乎重新开始。我只需要失败的重试。
2:catch
实际上从未记录任何错误,即使它似乎在出错时重试。
对于第一个问题,我尝试了 switchMap
而不是 flatMap
,如下所示:
Rx.Observable
.interval(500)
.take(arr.length)
.map(idx => arr[idx])
.switchMap(dt => randomFunc(dt)
.catch(e => conosle.log(e))
.retry(5)
)
.subscribe();
这样看来它只重试了失败的,但仍然没有记录任何错误,我什至不确定 switchMap
在这里是否合适(我真的是一个 Rx 菜鸟)。
任何帮助将不胜感激,谢谢!
When
randomFunc
returns an error it seems that the whole chain starts over. I only need the failed one to retry.`
在 RxJs 中,当将 Observables 组合在一起时,错误也会传播,未捕获的错误将导致取消订阅。
您在 switchMap
中使用 catch 的想法是正确的。虽然 switchMap
一次只会 flatten 一个 Observable,当下一个值被映射时,前一个 Observable 将被取消订阅(它是 switched出)
// Observable from array
Rx.Observable.from(arr)
.concatMap(value =>
// Put a 500 ms delay between each value
Rx.Observable.timer(500).map(_ => value)
)
.flatMap(dt =>
randomFunc(dt)
.retryWhen(errs =>
errs
.do(err => console.error(err))
// Retry at most 5 times
.take(5)
// Retry after 500ms
.delay(500)
)
)
.subscribe();
catch never actually logs any error, even though it seems to retry on error.
传递给 catch 的函数应该 return 一个 Observable 例如:
Observable.throw(new Error())
.catch(e =>
(console.error(e), Observable.of('backup value'))
)
.subscribe();
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-catch
有几件事需要注意。 retry()
运算符只是重新订阅它的源,所以如果你不想再次开始整个迭代,你可以 merge/concat 异步函数进入链。
Rx.Observable.from(arr)
.concatMap(val => {
let attempts = 0;
return Rx.Observable.of(val)
.delay(500)
.concatMap(val => randomFunc(val)
.catch((err, caught) => {
console.log('log error');
if (attempts++ === 1) {
return Rx.Observable.of(err);
} else {
return caught;
}
})
);
})
.subscribe(val => console.log(val));
function randomFunc(dt) {
return Rx.Observable.create(observer => {
if (dt === 'random') {
observer.error(`error received ${dt}`);
} else {
observer.next(dt);
observer.complete();
}
});
}
观看现场演示:https://jsbin.com/qacamab/7/edit?js,console
这会打印到控制台:
1
2
3
4
log error
log error
error received random
6
7
8
9
10
catch()
运算符是最重要的部分。它的选择器函数有两个参数:
err
- 发生的错误caught
- 原始的 Observable。
如果我们从选择器函数 return caught
我们将重新订阅源 Observable(与 retry(1)
相同)。由于您想记录每条错误消息,我们必须使用 catch()
而不是 retry()
。通过 returning Rx.Observable.of(err)
我们进一步传播错误,然后它会作为 next
通知被订阅者接收。我们也可以 return 只是 Observable.empty()
来简单地忽略错误。