我如何处理 RxJS 中相互传递数据的子顺序 URL 调用?

How do i handle sub sequential URL calls in RxJS that pass data to each other?

我有一个服务调用,如果服务器忙于处理响应,可能 returns 给我一个队列 URL。

我正在 angular 中编写一项服务来处理这些类型的调用,并且我正在努力找出哪种 RXJS 6+ 操作员可以为我处理此问题。

这意味着我想做一个回退,如果对我的响应 returns 排队 URL,我将订阅这个电话,然后重试直到我得到答案。

等待时间最长可达 30 秒 (< insert frustrations here>)。

从 rxjs 文档页面可以看出,我需要使用 concatMap 运算符,并以某种方式 retry 调用直到我得到正确的响应?也许有一些 delay 运营商来限制调用量?

我找到了这个 snippet from https://www.learnrxjs.io/

提前致谢!

如果我的假设不正确,我会回来编辑它,但假设当您需要将请求重定向到队列时,您得到的负载看起来像这样:

{
    queue: true
    url: "http://myqueue.com"
}

与刚收到回复时相比:

{
    queue: false
    data: { "foo": "bar" }
}

重定向请求的方式可能如下所示

http.get<DataModel>('http://the-original-url').pipe(
    switchMap(dataModel=>{
        //did we get queued?
        if (dataModel.queue) {
            //make a subsequent request and retry it 3 times
            return http.get<QueueResponseModel>(dataModel.url).pipe(retry(3))
        } else {
            //return an observable that emits the desired data
            return of(dataModel.data)
        }
    })
)

这是一个递归调用结构,所以你需要写一个递归的可观察对象。您没有提供确切的响应结构,所以我无法给出确切的代码,但在较高级别上它看起来像这样:

getQueuedResponse<T>(url) {
  return this.http.get<T>(url).pipe( // fetch the first URL
    switchMap(res => 
      (res.queueUrl) // if queued (your actual queue indicator may be different)
        ? this.getQueuedResponse<T>(res.queueUrl) //then recurse (your actual next url may be different or it may be the original url again)
        : of(res))); // else break (what you actually return here may be different)
}

如果你想用一个简单的计时器,你可以添加延迟:

getQueuedResponse<T>(url) {
  return this.http.get<T>(url).pipe( // fetch the first URL
    switchMap(res => 
      (res.queueUrl) // if queued, recurse after 5 seconds
        ? timer(5000).pipe(switchMap(t => this.getQueuedResponse<T>(res.queueUrl))
        : of(res))); // else break
}

或者,如果您的需求略有不同,您可以一遍又一遍地调用相同的 URL,您可以将其视为轮询问题:

pollForResponse<T>(url) {
  return timer(0, 5000).pipe( // start right away then emit every 5 seconds
    switchMap(i => this.http.get<T>(url)), // request the URL
    takeWhile(r => !!r.queued), // keep taking while it's queued
    last() // only emit the last response
  );
}