我如何处理 RxJS 中相互传递数据的子顺序 URL 调用?
How do i handle sub sequential URL calls in RxJS that pass data to each other?
我有一个服务调用,如果服务器忙于处理响应,可能 returns 给我一个队列 URL。
我正在 angular 中编写一项服务来处理这些类型的调用,并且我正在努力找出哪种 RXJS 6+
操作员可以为我处理此问题。
这意味着我想做一个回退,如果对我的响应 returns 排队 URL,我将订阅这个电话,然后重试直到我得到答案。
等待时间最长可达 30 秒 (< insert frustrations here>
)。
从 rxjs 文档页面可以看出,我需要使用 concatMap
运算符,并以某种方式 retry
调用直到我得到正确的响应?也许有一些 delay
运营商来限制调用量?
我找到了这个 snippet from https://www.learnrxjs.io/。
提前致谢!
如果我的假设不正确,我会回来编辑它,但假设当您需要将请求重定向到队列时,您得到的负载看起来像这样:
{
queue: true
url: "http://myqueue.com"
}
与刚收到回复时相比:
{
queue: false
data: { "foo": "bar" }
}
重定向请求的方式可能如下所示
http.get<DataModel>('http://the-original-url').pipe(
switchMap(dataModel=>{
//did we get queued?
if (dataModel.queue) {
//make a subsequent request and retry it 3 times
return http.get<QueueResponseModel>(dataModel.url).pipe(retry(3))
} else {
//return an observable that emits the desired data
return of(dataModel.data)
}
})
)
这是一个递归调用结构,所以你需要写一个递归的可观察对象。您没有提供确切的响应结构,所以我无法给出确切的代码,但在较高级别上它看起来像这样:
getQueuedResponse<T>(url) {
return this.http.get<T>(url).pipe( // fetch the first URL
switchMap(res =>
(res.queueUrl) // if queued (your actual queue indicator may be different)
? this.getQueuedResponse<T>(res.queueUrl) //then recurse (your actual next url may be different or it may be the original url again)
: of(res))); // else break (what you actually return here may be different)
}
如果你想用一个简单的计时器,你可以添加延迟:
getQueuedResponse<T>(url) {
return this.http.get<T>(url).pipe( // fetch the first URL
switchMap(res =>
(res.queueUrl) // if queued, recurse after 5 seconds
? timer(5000).pipe(switchMap(t => this.getQueuedResponse<T>(res.queueUrl))
: of(res))); // else break
}
或者,如果您的需求略有不同,您可以一遍又一遍地调用相同的 URL,您可以将其视为轮询问题:
pollForResponse<T>(url) {
return timer(0, 5000).pipe( // start right away then emit every 5 seconds
switchMap(i => this.http.get<T>(url)), // request the URL
takeWhile(r => !!r.queued), // keep taking while it's queued
last() // only emit the last response
);
}
我有一个服务调用,如果服务器忙于处理响应,可能 returns 给我一个队列 URL。
我正在 angular 中编写一项服务来处理这些类型的调用,并且我正在努力找出哪种 RXJS 6+
操作员可以为我处理此问题。
这意味着我想做一个回退,如果对我的响应 returns 排队 URL,我将订阅这个电话,然后重试直到我得到答案。
等待时间最长可达 30 秒 (< insert frustrations here>
)。
从 rxjs 文档页面可以看出,我需要使用 concatMap
运算符,并以某种方式 retry
调用直到我得到正确的响应?也许有一些 delay
运营商来限制调用量?
我找到了这个 snippet from https://www.learnrxjs.io/。
提前致谢!
如果我的假设不正确,我会回来编辑它,但假设当您需要将请求重定向到队列时,您得到的负载看起来像这样:
{
queue: true
url: "http://myqueue.com"
}
与刚收到回复时相比:
{
queue: false
data: { "foo": "bar" }
}
重定向请求的方式可能如下所示
http.get<DataModel>('http://the-original-url').pipe(
switchMap(dataModel=>{
//did we get queued?
if (dataModel.queue) {
//make a subsequent request and retry it 3 times
return http.get<QueueResponseModel>(dataModel.url).pipe(retry(3))
} else {
//return an observable that emits the desired data
return of(dataModel.data)
}
})
)
这是一个递归调用结构,所以你需要写一个递归的可观察对象。您没有提供确切的响应结构,所以我无法给出确切的代码,但在较高级别上它看起来像这样:
getQueuedResponse<T>(url) {
return this.http.get<T>(url).pipe( // fetch the first URL
switchMap(res =>
(res.queueUrl) // if queued (your actual queue indicator may be different)
? this.getQueuedResponse<T>(res.queueUrl) //then recurse (your actual next url may be different or it may be the original url again)
: of(res))); // else break (what you actually return here may be different)
}
如果你想用一个简单的计时器,你可以添加延迟:
getQueuedResponse<T>(url) {
return this.http.get<T>(url).pipe( // fetch the first URL
switchMap(res =>
(res.queueUrl) // if queued, recurse after 5 seconds
? timer(5000).pipe(switchMap(t => this.getQueuedResponse<T>(res.queueUrl))
: of(res))); // else break
}
或者,如果您的需求略有不同,您可以一遍又一遍地调用相同的 URL,您可以将其视为轮询问题:
pollForResponse<T>(url) {
return timer(0, 5000).pipe( // start right away then emit every 5 seconds
switchMap(i => this.http.get<T>(url)), // request the URL
takeWhile(r => !!r.queued), // keep taking while it's queued
last() // only emit the last response
);
}