RXJS 的超时运算符未按预期工作

Timeout Operator of RXJS not working as expected

我必须进行轮询,直到收到完成的响应。但我也想只进行 2 分钟的轮询并防止无限轮询,我正在使用超时运算符,但这似乎不起作用。

     interval(5000)
     .pipe(

    startWith(0),
    switchMap(() => this.someService.getQueueJob(jobId)),
    takeUntil(this.stopPolling),
    timeout(2*60000)
  )
  .subscribe((res) => {
    this.jobStatus = res.attributes.status;
    if (this.jobStatus === 'Completed') {
      this.fileUploadStatus = 'success';
      this.stopPolling.next();
    } else if (this.jobStatus === 'Failed') {
      this.fileUploadStatus = 'error';
      this.stopPolling.next();
    }
  });

我还需要在发生超时时抛出错误并通知用户。

timeout will be reset everytime there is an emission. So for each emission of the interval and the corresponding this.someService.getQueueJob() its reset. You could accomplish your requirement with an additional takeUntil with the timer 函数。

还有

  1. 你在问题中说超时2分钟,但代码只有分钟?
  2. 正如@Chris 在评论中提到的,您可以将 interval(5000) + startWith(0) 组合替换为 timer(0, 5000)
timer(0, 5000).pipe(
  switchMap(() => this.someService.getQueueJob(jobId)),
  takeUntil(this.stopPolling),
  takeUntil(timer(120000))
).subscribe(
  ...
);

编辑 1:将 interval(5000) + startWith(0) 替换为 timer(0, 5000)

您可以在启动时使用 timer 超时:

// set time for timeout
timer(2 * 60000).pipe(
  // trow error when timer emit event after 2 minutes
  map(() => { throw new TimeoutError() }),
  // put startWith operator to start before timer emit timeout event
  startWith(0),
  // switch to polling timer for every 5 second
  switchMap(() => timer(0, 5000)),
  // get date from http
  switchMap(() => this.someService.getQueueJob(jobId)),
  // stop manually the polling
  takeUntil(this.stopPolling),
).subscribe(
  (data) => console.log(data),
  (error) => {
    if (error instanceof TimeoutError)
      console.log('time out')
    else
      console.log('http error')
  }
);