Rxjs:每隔 X 秒重复一个 Ajax 调用,但等待最后一个调用完成

Rxjs: Repeat an Ajax Call Every X Seconds, but Wait for the Last One to Complete

我想在订阅自动刷新 Observable 时每隔 x 秒查询一次 API,确保最后一个请求已完成,然后再发送另一个请求。

let autoRefresher = new Observable().exhaustMap(() => Observable.defer(() => {
    return someService.returningAPromise();
}).timeout(refreshIntervalInMs).repeat());

有更好的方法吗?我如何在不每次都创建新的可观察对象的情况下更新刷新间隔?

我会这样做:

import {Observable} from 'rxjs';

function doRequest() {
  if (Math.random() < 0.25) {
    return Observable.of('HTTP Response').delay(3000);
  } else {
    return Observable.of('HTTP Response');
  }
}

let autoRefresher = Observable.timer(0, 1000)
  .exhaustMap(doRequest)
  .subscribe(response => {
    console.log(response);
  });

查看现场演示:http://plnkr.co/edit/7HAib10r6Vdl1x2U2wFS

这会随机产生 3 秒的延迟。运算符 timer() 定期发出一个值。然后 exhaustMap() 订阅前面的 Observable 并忽略所有发出的 Observable,直到当前 Observable 完成。所以 timer() 正在发出值,但这些值被 exhaust().

忽略

顺便说一句,请注意我使用的是 TypeScript。

我看到这是一个很老的问题,我只是想提出一个可能更“稳定”的解决方案。这种方法的问题是可能 windows 由于计时器以固定的 1s 间隔运行而没有发出请求。

例如:

Interval 1s, Request Duration < 1s: 每 1 秒发出一个请求,但是如果请求需要 400 毫秒才能完成,那么到下一个请求开始只需要 600 毫秒(而不是 1 秒) Interval 1s, Request Duration 1,5s: Request starts at the beginning, in next interval request is still 运行ning so exhaustMap filter it, and in 3rd interval request will 运行 again (500ms after previous request)

在我的示例中,请求将总是等待 1 秒,然后再次 运行ning

import { delay, flatMap, repeat, takeUntil } from 'rxjs/operators';
import { of,Subject } from 'rxjs';

function doRequest() {
    if (Math.random() < 0.25) {
        return Observable.of('HTTP Response').delay(3000);
    } else {
        return Observable.of('HTTP Response');
    }
}


const subject: Subject<boolean> = new Subject();
of(null)
    .pipe(
        // can be used to cancel
        takeUntil(subject),
        // does request
        flatMap(doRequest),
        // delays by 1second
        delay(1000),
        // repeat steps above
        repeat(),
    )
    .subscribe(response => {
        console.log(respons);
    });

重要的是 takeUntil(..) 是 pipe() 中的最后一次调用。否则 observables 可能不会被取消。参见 https://cartant.medium.com/rxjs-avoiding-takeuntil-leaks-fb5182d047ef