使用多个请求捕获可观察对象的最后一个值
Capture last value of an observable with multiple requests
我有一个服务层负责处理数据并将其发送到使用相同值的组件。
我需要向 api 发出请求,只要值为 isProcessed == false
它就会每半秒尝试 10 次。
如果尝试失败,它只会再发出 3 次请求。
如果isProcessed == true
,停止申请。
所有的逻辑都是内置的,但我无法输出最后一个可观察值。已发送所有回复。
所有请求都是由observables发送的,false和true都有,但我只需要最后一个。
这里所有请求响应都到达组件,而不仅仅是最后一个
负责访问API的服务:
public getPositionConsolidate(): Observable<PosicaoConsolidada> {
return this.http.get<PosicaoConsolidada>(`${environment.api.basePosicaoConsolidada}/consolidado`)
.pipe(
map(res => res),
retryWhen(genericRetryStrategy()),
shareReplay(1),
catchError(err => {
console.log('Error in Position Consolidate', err);
return throwError(err);
})
)
}
负责处理数据并将其发送到组件的服务:
public positionConsolidate() {
let subject = new BehaviorSubject<any>([]);
this.api.getPositionConsolidate().subscribe(response => {
if(response.hasProcessado == false) {
for (let numberRequest = 0; numberRequest < 10; numberRequest++) {
setTimeout(() => {
//subject.next(this.api.getPosicaoConsolidada().subscribe());
this.api.getPositionConsolidate().subscribe(res => {
subject.next(res)
})
}, numberRequest * 500, numberRequest);
}
} else {
retryWhen(genericRetryStrategy()),
finalize(() => this.loadingService.loadingOff())
}
})
return subject.asObservable()
}
在组件中:
public ngOnInit() {
this.coreState.positionConsolidate().subscribe(res => console.log(res))
}
你的问题最容易回答的部分是,如果你只是想从一个可观察到的最后一次发射,那么只需使用 last 运算符。但是,您编写内容的方式使其难以合并。以下将您的代码重构为单个流,没有任何非 rxjs 控制结构。
public positionConsolidate() {
return this.api.getPositionConsolidate().pipe(
concatMap(res => iif(() => res.hasProcessado,
of(res),
interval(500).pipe(
take(10),
concatMap(() => this.api.getPositionConsolidate())
)
)),
retryWhen(genericRetryStrategy()),
finalize(() => this.loadingService.loadingOff()),
last()
);
}
发生了什么事
- 首先执行初始 api 调用。
- 然后根据结果,要么...
- Returns初始结果
- 每 500 毫秒调用 api 10 次。
- 实现 retryWhen 和 finalize.
的功能
- Returns 最后发出的结果。
也不要订阅可观察对象内部的可观察对象 - 这就是 concatMap 等高阶可观察对象的用途。
我有一个服务层负责处理数据并将其发送到使用相同值的组件。
我需要向 api 发出请求,只要值为 isProcessed == false
它就会每半秒尝试 10 次。
如果尝试失败,它只会再发出 3 次请求。
如果isProcessed == true
,停止申请。
所有的逻辑都是内置的,但我无法输出最后一个可观察值。已发送所有回复。
所有请求都是由observables发送的,false和true都有,但我只需要最后一个。
这里所有请求响应都到达组件,而不仅仅是最后一个
负责访问API的服务:
public getPositionConsolidate(): Observable<PosicaoConsolidada> {
return this.http.get<PosicaoConsolidada>(`${environment.api.basePosicaoConsolidada}/consolidado`)
.pipe(
map(res => res),
retryWhen(genericRetryStrategy()),
shareReplay(1),
catchError(err => {
console.log('Error in Position Consolidate', err);
return throwError(err);
})
)
}
负责处理数据并将其发送到组件的服务:
public positionConsolidate() {
let subject = new BehaviorSubject<any>([]);
this.api.getPositionConsolidate().subscribe(response => {
if(response.hasProcessado == false) {
for (let numberRequest = 0; numberRequest < 10; numberRequest++) {
setTimeout(() => {
//subject.next(this.api.getPosicaoConsolidada().subscribe());
this.api.getPositionConsolidate().subscribe(res => {
subject.next(res)
})
}, numberRequest * 500, numberRequest);
}
} else {
retryWhen(genericRetryStrategy()),
finalize(() => this.loadingService.loadingOff())
}
})
return subject.asObservable()
}
在组件中:
public ngOnInit() {
this.coreState.positionConsolidate().subscribe(res => console.log(res))
}
你的问题最容易回答的部分是,如果你只是想从一个可观察到的最后一次发射,那么只需使用 last 运算符。但是,您编写内容的方式使其难以合并。以下将您的代码重构为单个流,没有任何非 rxjs 控制结构。
public positionConsolidate() {
return this.api.getPositionConsolidate().pipe(
concatMap(res => iif(() => res.hasProcessado,
of(res),
interval(500).pipe(
take(10),
concatMap(() => this.api.getPositionConsolidate())
)
)),
retryWhen(genericRetryStrategy()),
finalize(() => this.loadingService.loadingOff()),
last()
);
}
发生了什么事
- 首先执行初始 api 调用。
- 然后根据结果,要么...
- Returns初始结果
- 每 500 毫秒调用 api 10 次。
- 实现 retryWhen 和 finalize. 的功能
- Returns 最后发出的结果。
也不要订阅可观察对象内部的可观察对象 - 这就是 concatMap 等高阶可观察对象的用途。