使用多个请求捕获可观察对象的最后一个值

Capture last value of an observable with multiple requests

我有一个服务层负责处理数据并将其发送到使用相同值的组件。

我需要向 api 发出请求,只要值为 isProcessed == false 它就会每半秒尝试 10 次。

如果尝试失败,它只会再发出 3 次请求。

如果isProcessed == true,停止申请。

所有的逻辑都是内置的,但我无法输出最后一个可观察值。已发送所有回复。

所有请求都是由observables发送的,false和true都有,但我只需要最后一个。

这里所有请求响应都到达组件,而不仅仅是最后一个

负责访问API的服务:

public getPositionConsolidate(): Observable<PosicaoConsolidada> {       
    return this.http.get<PosicaoConsolidada>(`${environment.api.basePosicaoConsolidada}/consolidado`)
      .pipe(
        map(res => res),
        retryWhen(genericRetryStrategy()),
        shareReplay(1),
        catchError(err => {
            console.log('Error in Position Consolidate', err);
            return throwError(err);
        })
    )
}

负责处理数据并将其发送到组件的服务:

public positionConsolidate() {
    let subject = new BehaviorSubject<any>([]);
    this.api.getPositionConsolidate().subscribe(response => {
        if(response.hasProcessado == false) {
            for (let numberRequest = 0; numberRequest < 10; numberRequest++) {
                setTimeout(() => {
                   //subject.next(this.api.getPosicaoConsolidada().subscribe());
                   this.api.getPositionConsolidate().subscribe(res => {
                        subject.next(res)
                   })
                }, numberRequest * 500, numberRequest);
            }
        } else {
            retryWhen(genericRetryStrategy()),
            finalize(() => this.loadingService.loadingOff())
        }   
    })
    return subject.asObservable()
}

在组件中:

public ngOnInit() {
 this.coreState.positionConsolidate().subscribe(res => console.log(res))
}

你的问题最容易回答的部分是,如果你只是想从一个可观察到的最后一次发射,那么只需使用 last 运算符。但是,您编写内容的方式使其难以合并。以下将您的代码重构为单个流,没有任何非 rxjs 控制结构。

public positionConsolidate() {

  return this.api.getPositionConsolidate().pipe(
    concatMap(res => iif(() => res.hasProcessado,
      of(res),
      interval(500).pipe(
        take(10),
        concatMap(() => this.api.getPositionConsolidate())
      )
    )),
    retryWhen(genericRetryStrategy()),
    finalize(() => this.loadingService.loadingOff()),
    last()
  );
}

发生了什么事

  • 首先执行初始 api 调用。
  • 然后根据结果,要么...
    • Returns初始结果
    • 每 500 毫秒调用 api 10 次。
  • 实现 retryWhenfinalize.
  • 的功能
  • Returns 最后发出的结果。

也不要订阅可观察对象内部的可观察对象 - 这就是 concatMap 等高阶可观察对象的用途。