如何使用 RxJS 调整重试超时时间?

How do I adjust timeout duration on retry using RxJS?

我正在使用最新的 Angular 以及 Typescript 和 RxJS 5。

Angular 目前已经使 RxJS 成为必需品。我主要使用 C# 超过 10 年,我非常习惯 Linq/Lambdas/fluent 语法,我认为它构成了 Reactive 的基础。

我想在重试时使用增加的超时值进行 Http get 调用,但我在查看如何执行此操作以及仍将所有内容保留在管道中(不使用外部状态)时遇到问题。

我知道我可以做到这一点,但它只会使用相同的超时值重试。

myHttpObservable.timeout(1000).retry(2);

RxJS 的文档在很多地方都很糟糕,在这里询问它只会让我的问题被删除,这很可悲......所以我被迫查看源代码。

有没有一种方法可以每次以增加的超时持续时间重试,同时保持管道中的状态?另外,我希望在第一次尝试时有一个初始超时。

我一开始尝试过类似的事情,但意识到令人困惑的 retryWhen 运算符并不是真正为我想要的而设计的:

myHttpObservable.timeout(1000).retryWhen((theSubject: Observable<Error>) => {
 return  aNewMyObservableCreatedinHere.timeout(2000);  
});

我知道我可以使用外部状态来完成此操作,但我基本上是在寻找一种优雅的解决方案,我认为这正是他们推动反应式编程风格的原因。

目前 RxJs5 最大的问题之一是文档。它真的很零散,还达不到以前的版本。通过查看 the documentation of RxJs4 you can see that .retryWhen() 已经有一个 example 用于构建可用的指数退避,可以轻松迁移到 RxJs5:

Rx.Observable.throw(new Error('splut'))
  .retryWhen(attempts => Rx.Observable.range(1, 3)
    .zip(attempts, i => i)
    .mergeMap(i => {
      console.log("delay retry by " + i + " second(s)");
      return Rx.Observable.timer(i * 1000);
    })
  ).subscribe();

根据我之前回答的评论中的额外输入,我一直在尝试使用 .expand() 运算符来解决您的要求:

I want to make a get with timeout duration = X and if it times out, then retry with timeout = X+1

以下代码段以 timeout = 500 开始,每次尝试超时都会增加 attempt * 500,直到达到 maxAttempts或已成功检索到结果:

getWithExpandingTimeout('http://reaches-max-attempts', 3)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );

getWithExpandingTimeout('http://eventually-finishes-within-timeout', 10)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );



/*
retrieve the given url and keep trying with an expanding 
timeout until it succeeds or maxAttempts has been reached
*/
function getWithExpandingTimeout(url, maxAttempts) {
  return get(url, 1)
    .expand(res => {
      if(res.error) {
        const nextAttempt = 1 + res.attempt;
        if(nextAttempt > maxAttempts) {
          // too many retry attempts done
          return Rx.Observable.throw(new Error(`Max attempts reached to retrieve url ${url}: ${res.error.message}`));
        }
        
        // retry next attempt
        return get(url, nextAttempt);
      }
      return Rx.Observable.empty(); // done with retrying, result has been emitted
    })
    .filter(res => !res.error);
}

/*
 retrieve info from server with timeout based on attempt
 NOTE: does not errors the stream so expand() keeps working
*/
function get(url, attempt) {
  return Rx.Observable.of(`result for ${url} returned after #${attempt} attempts`)
    .do(() => console.log(`starting attempt #${attempt} to retrieve ${url}`))
    .delay(5 * 500)
    .timeout(attempt * 500)
    .catch(error => Rx.Observable.of({ attempt: attempt, error }));
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

这是如何工作的

上游或由 .expand() 运算符生成的每个值都被发送到下游并用作 .expand() 运算符的输入。这一直持续到没有进一步的值被发出。通过利用此行为,当发射包含内部 .error 值时,.get() 函数会尝试增加 re-attempted。

.get() 函数不会抛出错误,否则我们需要在 .expand() 中捕获它,否则递归将意外中断。

当超过 maxAttempts 时,.expand() 运算符会抛出一个停止递归的错误。当发射中不存在 .error 属性 时,我们期望这是一个成功的结果并发射一个空的 Observable,停止递归。

注意:它使用 .filter() 来消除基于 .error 属性 的所有排放,因为 .expand() 产生的所有值也向下游排放,但是这些 .error 值是内部状态。

backoff-rxjs npm package to deal with this case called retryBackoff. I described it in the article at blog.angularindepth.com中有一个运算符,但简而言之就是:

source.pipe(
  retryWhen(errors => errors.pipe(
      concatMap((error, iteration) => 
          timer(Math.pow(2, iteration) * initialInterval, maxInterval))));

这里是 the sources 更可定制的版本。

我使用 delayWhen 创建一个通知函数,使 retryWhen 在每次错误发生后延迟增加时发出。您可以通过更改用于计算重试之间延迟的公式来选择不同的时间序列。请参阅下面的两个示例公式,了解几何和指数退避重试策略。另请注意我如何限制重试次数并在达到该限制时抛出错误。

const initialDelay = 1000;
const maxRetries = 4;
throwError('oops')
  .pipe(
    retryWhen(errors =>
      errors.pipe(
        delayWhen((_,i) => {
          // const delay = (i+1)*initialDelay;            // geometric
          const delay = Math.pow(2,i)*initialDelay;       // exponential
          console.log(`retrying after ${delay} msec...`);
          return timer(delay);
        }),
        take(maxRetries),
        concat(throwError('number of retries exceeded')))))
  .subscribe(
    x => console.log('value:', x),
    e => console.error('error:', e)
  );

我认为退避是不正确的。这个延迟是到下一个请求的错误,不是请求到下一个请求,rxjs中的超时说源花费时间大于超时,我想更改请求的超时,而不是添加之间的延迟错误和下一个请求,谁知道怎么做?

像那样:
请求:8s,超时:3s,超时变化:3 * 2 * retryIndex
expect:这个请求会重试两次,最后耗时8s。
Backoff:不使用超时,这个请求会在第一次成功。如果请求有另一个错误,下一个请求将在 3 * 2 * retryIndex.

之后发送