轮询服务直到我得到响应或在特定时间段后超时

Poll a service until I get a response or time out after certain time period

要求 呼叫服务。

如果服务returns数据,将数据设置为变量。功能超过

如果该服务 returns data = null,则每 20 秒调用一次该服务,重复直到它 returns data= "a list or an object" 或调用该服务 2 分钟并停止.

我试过的 需要每 20 秒轮询一次此服务 getUrlById(id),直到我在 this.url 内收到响应或在 6 分钟后超时。

尝试了以下解决方案,但没有按预期工作。

pollUrl(id) {
    interval(2000).pipe(
      timeout(600000),
      takeWhile(() => this.url)
    ).subscribe(i => {
      this.service.getUrlById(id).subscribe(res =>{
        if(res && res["result"]){
          this.url = res["result"];
        }
      })
      })
  }

来自评论我试过的

在这里调用了虚拟服务,demo

这里是虚拟服务returns data = null。所以根据要求,我需要每 20 秒调用一次服务,直到 2 分钟。那是行不通的。

没有必要使用这段代码,我想达到要求。可以有不同的方法。

只需使用 find 运算符(找到第一个通过某些测试并发出 Docs 的值),这样 getUrlById 返回的 Observable 会在 response.result 时完成已定义,如下所示:

interval(2000)
  .pipe(
    exhaustMap(() => this.service.getUrlById(id)), // <== switch subscription to `getUrlById` 
    find(x => x && x.result && x.result.length > 0),
    timeout(6 * 60 * 1000)
  )
  .subscribe(i => { })

这里是 LIVE DEMO

可选择使用 expand,这样只要当前请求尚未完成,就不会发送进一步的请求。

const api$ = this.service.getUrlById(id);

api$.pipe(
    expand(x => x && x.result && x.result.length > 0 ? EMPTY : api$.pipe(delay(2000))),
    last(), // get the result
    timeout(6 * 60 * 1000)
)
.subscribe(i => { })

您是否尝试过使用 RetryWhen 运算符?

类似于:(link: https://stackblitz.com/edit/zhck3h?file=index.ts)

import { interval, of, throwError, zip, from } from 'rxjs';
import { mergeMap, retry, map, retryWhen, delay, take, tap } from 'rxjs/operators';

const randomCall = () => {
  const rand = Math.random() * 100;
  if (rand > 50) {
    console.log('has failed')
    throw 'error'
  } else {
    console.log('has succeed')
    return 'success';
  };
}
const source = of(1).pipe(
  map(() => randomCall()),
  take(1)
);

let retries = 5;
const example = source.pipe(
  retryWhen(errors => errors.pipe(
      delay(1 * 1000),
      mergeMap(error => retries-- > 0 ? of(error) : throwError('error'))
  ))
)

const subscribe = example.subscribe({
  next: val => console.log('success: ' + val),
  error: val => console.log(`retry exceeded!`)
});

首先,我想说 timeout 运算符在这种情况下是多余的。如果 source 没有在 600000ms 内发射,它应该会抛出一个错误,但这永远不会发生,因为 source 会每 2000ms 发射一次。

考虑到这一点,我的方法是:

interval(2000).pipe(
  switchMap(id => this.service.getUrlById(id)),
  filter(res && res["result"]),
  
  // if we got a response -> stop polling
  first(),

  // if we didn't get a response in `600000`ms -> stop polling
  takeUntil(timer(600000)),
)

它之前没有工作可能是因为在 this.url 被填充之前达到了 takeWhile(() => this.url),这意味着当 takeWhile 的回调函数被调用时,this.urlundefined(ar 任何虚假值),因此整个流将完成。

每当轮询出现时,通常采用 expand 运算符,一方面是因为它排除了请求花费的时间超过轮询周期并且最终有两个同时请求的情况。像

of(undefined).pipe(
  expand((result) =>
    result === undefined
      ? this.service.getUrlById(id).pipe(
          map((data) => data?.result),
          switchMap((result) =>
            result !== undefined
              ? of(result)
              : of(undefined).pipe(delay(20000)),
          ),
        )
      : EMPTY,
  ),
  filter((result) => result !== undefined),
  timeout(600000),
);