RxJS 每 x 秒从数组中发出值,使用该值调用一个函数,如果失败则重试

RxJS emit values from array every x seconds, call a function with that value, retry if failed

我有一个数组,值的类型无关紧要。我想做的是每 x 秒发出一个值,用该值调用一个函数,如果该函数由于某种原因失败,则在 y 秒后重试(可以是一个简单的常量,这里不需要任何增量的东西) .

到目前为止我有什么

Rx.Observable
    .interval(500)
    .take(arr.length)
    .map(idx => arr[idx])
    .flatMap(dt => randomFunc(dt))
    .catch(e => conosle.log(e))
    .retry(5)
    .subscribe();

function randomFunc(dt) {
    return Rx.Observable.create(observer => {
        if (dt === 'random') {
            return observer.error(`error`);
        } else {
            return observer.next();
        }
    });
}

这里有 2 个问题:

1: 当 randomFunc returns 出现错误时,整个链似乎重新开始。我只需要失败的重试。

2:catch 实际上从未记录任何错误,即使它似乎在出错时重试。

对于第一个问题,我尝试了 switchMap 而不是 flatMap,如下所示:

Rx.Observable
    .interval(500)
    .take(arr.length)
    .map(idx => arr[idx])
    .switchMap(dt => randomFunc(dt)
        .catch(e => conosle.log(e))
        .retry(5)
    )
    .subscribe();

这样看来它只重试了失败的,但仍然没有记录任何错误,我什至不确定 switchMap 在这里是否合适(我真的是一个 Rx 菜鸟)。

任何帮助将不胜感激,谢谢!

When randomFunc returns an error it seems that the whole chain starts over. I only need the failed one to retry.`

在 RxJs 中,当将 Observables 组合在一起时,错误也会传播,未捕获的错误将导致取消订阅。

您在 switchMap 中使用 catch 的想法是正确的。虽然 switchMap 一次只会 flatten 一个 Observable,当下一个值被映射时,前一个 Observable 将被取消订阅(它是 switched出)

// Observable from array
Rx.Observable.from(arr)
    .concatMap(value =>
        // Put a 500 ms delay between each value
        Rx.Observable.timer(500).map(_ => value)
    )
    .flatMap(dt =>
        randomFunc(dt)
        .retryWhen(errs =>
            errs
            .do(err => console.error(err))
            // Retry at most 5 times
            .take(5)
            // Retry after 500ms
            .delay(500)
        )
    )
    .subscribe();

catch never actually logs any error, even though it seems to retry on error.

传递给 catch 的函数应该 return 一个 Observable 例如:

Observable.throw(new Error())
    .catch(e =>
        (console.error(e), Observable.of('backup value'))
    )
    .subscribe();

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-catch

有几件事需要注意。 retry() 运算符只是重新订阅它的源,所以如果你不想再次开始整个迭代,你可以 merge/concat 异步函数进入链。

Rx.Observable.from(arr)
  .concatMap(val => {
    let attempts = 0;

    return Rx.Observable.of(val)
      .delay(500)
      .concatMap(val => randomFunc(val)
        .catch((err, caught) => {
          console.log('log error');
          if (attempts++ === 1) {
            return Rx.Observable.of(err);
          } else {
            return caught;
          }
        })
      );

  })
  .subscribe(val => console.log(val));

function randomFunc(dt) {
  return Rx.Observable.create(observer => {
    if (dt === 'random') {
      observer.error(`error received ${dt}`);
    } else {
      observer.next(dt);
      observer.complete();
    }
  });
}

观看现场演示:https://jsbin.com/qacamab/7/edit?js,console

这会打印到控制台:

1
2
3
4
log error
log error
error received random
6
7
8
9
10

catch() 运算符是最重要的部分。它的选择器函数有两个参数:

  • err - 发生的错误
  • caught - 原始的 Observable。

如果我们从选择器函数 return caught 我们将重新订阅源 Observable(与 retry(1) 相同)。由于您想记录每条错误消息,我们必须使用 catch() 而不是 retry()。通过 returning Rx.Observable.of(err) 我们进一步传播错误,然后它会作为 next 通知被订阅者接收。我们也可以 return 只是 Observable.empty() 来简单地忽略错误。