在 rxjs 中排队 http 调用
Queue http calls in rxjs
我正在开发一个会话服务来检查授权令牌是否过期。如果是,它会调用刷新令牌。在此请求期间,所有传入请求都应排队,并在请求完成后发出。之后,所有传入的请求都可以不排队通过,直到令牌再次过期。我为此画了一个弹珠图:
1. ---a---b---c--d-----e--
2. -t-------f------t------
3. ---a---b---------cd-e--
我将 1. 命名为 incoming$
Observable,2. 是 valve$
- 如果它是 true,请求可以通过,如果它是 false,它们应该被排队。当它变为真时,排队的人被解雇。
到目前为止我做了什么?我认为这应该通过添加一个名为 receiver$
的中间 Observable 来完成,它会根据 valve$
更改其值。当valve$
为真时,它只是return一个简单的主题,如果它为假,它return是一个能够记录价值的主题。
receiver$ = valve.pipe(
map((value) => {
if (value) {
return new Subject();
} else {
return (new Subject()).pipe(
shareReplay(),
);
}
})
);
然后 incoming$
中获得的每个新值都应添加到 recevier$
中的当前可观察值:
incoming$.pipe(
combineLatest(receiver$),
).subscribe((incomingValue, recevier) => {
recevier.next(incomingValue);
});
这是我无法理解的部分。每当 valve 变为真时,我都需要 receiver$
中的最后两个值。倒数第二个将保持队列,最后一个将保持活动主题。通过合并它们,我可以实现我的目标。我不知道如何实现这一点以及如何管理订阅。此外,对于这样一个看似简单的用例,这看起来过于复杂。
实现此行为的最佳方式是什么?
您可以按照这些思路考虑解决方案。
首先你创建一个 Subject,通过它发出你想要发出的所有请求
const requests$ = new Subject<Observable<any>>()
然后创建一个 Subject,通过它传达 valve 的状态,即您是可以立即执行请求还是必须缓冲它
const valve$ = new Subject<boolean>();
现在您可以创建一个仅在 阀 打开时才传递请求的流,即如果 valve$
发出的最后一个值是 true
const openStream$ = valve$.pipe(
switchMap(valve => {
if (valve) {
return requests$;
} else {
return empty();
}
})
);
您还可以创建一个流,在 valve 关闭时缓冲所有请求
const bufferedStream$ = requests$.pipe(
bufferToggle(valve$.pipe(filter(valve => !valve)), () => valve$.pipe(filter(valve => valve))),
mergeMap(bufferedCalls => bufferedCalls)
)
现在您所要做的就是 merge
openStream$
和 bufferedStream$
以及 subscribe
到结果流,就像这样
merge(openStream$, bufferedStream$).pipe(
mergeMap(request => request)
)
.subscribe(httpCallResult => {// do stuff})
我用以下数据测试了这个解决方案,用字符串的 Observables 模拟真实的 http 调用
const requests$ = new Subject<Observable<string>>();
setTimeout(() => {requests$.next(of('A'))}, 50);
setTimeout(() => {requests$.next(of('B'))}, 60);
setTimeout(() => {requests$.next(of('C'))}, 100);
setTimeout(() => {requests$.next(of('D'))}, 110);
setTimeout(() => {requests$.next(of('E'))}, 130);
setTimeout(() => {requests$.next(of('F'))}, 250);
setTimeout(() => {requests$.next(of('G'))}, 260);
setTimeout(() => {requests$.next(of('H'))}, 300);
setTimeout(() => {requests$.next(of('I'))}, 310);
setTimeout(() => {requests$.next(of('L'))}, 330);
const valve$ = new Subject<boolean>();
setTimeout(() => {valve$.next(true)}, 30);
setTimeout(() => {valve$.next(false)}, 80);
setTimeout(() => {valve$.next(true)}, 120);
setTimeout(() => {valve$.next(false)}, 200);
setTimeout(() => {valve$.next(true)}, 290);
您只需使用 concatMap
即可完成此操作,它根据值形式 valve$
合并两个不同的流。请注意,这需要 valve$
和 incoming$
都与 share()
共享。
valve$
.pipe(
concatMap(v => v
? incoming$.pipe(takeUntil(valve$))
: incoming$
.pipe(
takeUntil(valve$),
bufferCount(Number.POSITIVE_INFINITY),
mergeAll(),
)
),
)
.subscribe(console.log)
现场演示:https://stackblitz.com/edit/rxjs6-demo-d3bsxb?file=index.ts
我正在开发一个会话服务来检查授权令牌是否过期。如果是,它会调用刷新令牌。在此请求期间,所有传入请求都应排队,并在请求完成后发出。之后,所有传入的请求都可以不排队通过,直到令牌再次过期。我为此画了一个弹珠图:
1. ---a---b---c--d-----e--
2. -t-------f------t------
3. ---a---b---------cd-e--
我将 1. 命名为 incoming$
Observable,2. 是 valve$
- 如果它是 true,请求可以通过,如果它是 false,它们应该被排队。当它变为真时,排队的人被解雇。
到目前为止我做了什么?我认为这应该通过添加一个名为 receiver$
的中间 Observable 来完成,它会根据 valve$
更改其值。当valve$
为真时,它只是return一个简单的主题,如果它为假,它return是一个能够记录价值的主题。
receiver$ = valve.pipe(
map((value) => {
if (value) {
return new Subject();
} else {
return (new Subject()).pipe(
shareReplay(),
);
}
})
);
然后 incoming$
中获得的每个新值都应添加到 recevier$
中的当前可观察值:
incoming$.pipe(
combineLatest(receiver$),
).subscribe((incomingValue, recevier) => {
recevier.next(incomingValue);
});
这是我无法理解的部分。每当 valve 变为真时,我都需要 receiver$
中的最后两个值。倒数第二个将保持队列,最后一个将保持活动主题。通过合并它们,我可以实现我的目标。我不知道如何实现这一点以及如何管理订阅。此外,对于这样一个看似简单的用例,这看起来过于复杂。
实现此行为的最佳方式是什么?
您可以按照这些思路考虑解决方案。
首先你创建一个 Subject,通过它发出你想要发出的所有请求
const requests$ = new Subject<Observable<any>>()
然后创建一个 Subject,通过它传达 valve 的状态,即您是可以立即执行请求还是必须缓冲它
const valve$ = new Subject<boolean>();
现在您可以创建一个仅在 阀 打开时才传递请求的流,即如果 valve$
发出的最后一个值是 true
const openStream$ = valve$.pipe(
switchMap(valve => {
if (valve) {
return requests$;
} else {
return empty();
}
})
);
您还可以创建一个流,在 valve 关闭时缓冲所有请求
const bufferedStream$ = requests$.pipe(
bufferToggle(valve$.pipe(filter(valve => !valve)), () => valve$.pipe(filter(valve => valve))),
mergeMap(bufferedCalls => bufferedCalls)
)
现在您所要做的就是 merge
openStream$
和 bufferedStream$
以及 subscribe
到结果流,就像这样
merge(openStream$, bufferedStream$).pipe(
mergeMap(request => request)
)
.subscribe(httpCallResult => {// do stuff})
我用以下数据测试了这个解决方案,用字符串的 Observables 模拟真实的 http 调用
const requests$ = new Subject<Observable<string>>();
setTimeout(() => {requests$.next(of('A'))}, 50);
setTimeout(() => {requests$.next(of('B'))}, 60);
setTimeout(() => {requests$.next(of('C'))}, 100);
setTimeout(() => {requests$.next(of('D'))}, 110);
setTimeout(() => {requests$.next(of('E'))}, 130);
setTimeout(() => {requests$.next(of('F'))}, 250);
setTimeout(() => {requests$.next(of('G'))}, 260);
setTimeout(() => {requests$.next(of('H'))}, 300);
setTimeout(() => {requests$.next(of('I'))}, 310);
setTimeout(() => {requests$.next(of('L'))}, 330);
const valve$ = new Subject<boolean>();
setTimeout(() => {valve$.next(true)}, 30);
setTimeout(() => {valve$.next(false)}, 80);
setTimeout(() => {valve$.next(true)}, 120);
setTimeout(() => {valve$.next(false)}, 200);
setTimeout(() => {valve$.next(true)}, 290);
您只需使用 concatMap
即可完成此操作,它根据值形式 valve$
合并两个不同的流。请注意,这需要 valve$
和 incoming$
都与 share()
共享。
valve$
.pipe(
concatMap(v => v
? incoming$.pipe(takeUntil(valve$))
: incoming$
.pipe(
takeUntil(valve$),
bufferCount(Number.POSITIVE_INFINITY),
mergeAll(),
)
),
)
.subscribe(console.log)
现场演示:https://stackblitz.com/edit/rxjs6-demo-d3bsxb?file=index.ts