Rxjs:每隔 X 秒重复一个 Ajax 调用,但等待最后一个调用完成
Rxjs: Repeat an Ajax Call Every X Seconds, but Wait for the Last One to Complete
我想在订阅自动刷新 Observable 时每隔 x 秒查询一次 API,确保最后一个请求已完成,然后再发送另一个请求。
let autoRefresher = new Observable().exhaustMap(() => Observable.defer(() => {
return someService.returningAPromise();
}).timeout(refreshIntervalInMs).repeat());
有更好的方法吗?我如何在不每次都创建新的可观察对象的情况下更新刷新间隔?
我会这样做:
import {Observable} from 'rxjs';
function doRequest() {
if (Math.random() < 0.25) {
return Observable.of('HTTP Response').delay(3000);
} else {
return Observable.of('HTTP Response');
}
}
let autoRefresher = Observable.timer(0, 1000)
.exhaustMap(doRequest)
.subscribe(response => {
console.log(response);
});
查看现场演示:http://plnkr.co/edit/7HAib10r6Vdl1x2U2wFS
这会随机产生 3 秒的延迟。运算符 timer()
定期发出一个值。然后 exhaustMap() 订阅前面的 Observable 并忽略所有发出的 Observable,直到当前 Observable 完成。所以 timer()
正在发出值,但这些值被 exhaust()
.
忽略
顺便说一句,请注意我使用的是 TypeScript。
我看到这是一个很老的问题,我只是想提出一个可能更“稳定”的解决方案。这种方法的问题是可能 windows 由于计时器以固定的 1s 间隔运行而没有发出请求。
例如:
Interval 1s, Request Duration < 1s: 每 1 秒发出一个请求,但是如果请求需要 400 毫秒才能完成,那么到下一个请求开始只需要 600 毫秒(而不是 1 秒)
Interval 1s, Request Duration 1,5s: Request starts at the beginning, in next interval request is still 运行ning so exhaustMap filter it, and in 3rd interval request will 运行 again (500ms after previous request)
在我的示例中,请求将总是等待 1 秒,然后再次 运行ning
import { delay, flatMap, repeat, takeUntil } from 'rxjs/operators';
import { of,Subject } from 'rxjs';
function doRequest() {
if (Math.random() < 0.25) {
return Observable.of('HTTP Response').delay(3000);
} else {
return Observable.of('HTTP Response');
}
}
const subject: Subject<boolean> = new Subject();
of(null)
.pipe(
// can be used to cancel
takeUntil(subject),
// does request
flatMap(doRequest),
// delays by 1second
delay(1000),
// repeat steps above
repeat(),
)
.subscribe(response => {
console.log(respons);
});
重要的是 takeUntil(..) 是 pipe() 中的最后一次调用。否则 observables 可能不会被取消。参见 https://cartant.medium.com/rxjs-avoiding-takeuntil-leaks-fb5182d047ef
我想在订阅自动刷新 Observable 时每隔 x 秒查询一次 API,确保最后一个请求已完成,然后再发送另一个请求。
let autoRefresher = new Observable().exhaustMap(() => Observable.defer(() => {
return someService.returningAPromise();
}).timeout(refreshIntervalInMs).repeat());
有更好的方法吗?我如何在不每次都创建新的可观察对象的情况下更新刷新间隔?
我会这样做:
import {Observable} from 'rxjs';
function doRequest() {
if (Math.random() < 0.25) {
return Observable.of('HTTP Response').delay(3000);
} else {
return Observable.of('HTTP Response');
}
}
let autoRefresher = Observable.timer(0, 1000)
.exhaustMap(doRequest)
.subscribe(response => {
console.log(response);
});
查看现场演示:http://plnkr.co/edit/7HAib10r6Vdl1x2U2wFS
这会随机产生 3 秒的延迟。运算符 timer()
定期发出一个值。然后 exhaustMap() 订阅前面的 Observable 并忽略所有发出的 Observable,直到当前 Observable 完成。所以 timer()
正在发出值,但这些值被 exhaust()
.
顺便说一句,请注意我使用的是 TypeScript。
我看到这是一个很老的问题,我只是想提出一个可能更“稳定”的解决方案。这种方法的问题是可能 windows 由于计时器以固定的 1s 间隔运行而没有发出请求。
例如:
Interval 1s, Request Duration < 1s: 每 1 秒发出一个请求,但是如果请求需要 400 毫秒才能完成,那么到下一个请求开始只需要 600 毫秒(而不是 1 秒) Interval 1s, Request Duration 1,5s: Request starts at the beginning, in next interval request is still 运行ning so exhaustMap filter it, and in 3rd interval request will 运行 again (500ms after previous request)
在我的示例中,请求将总是等待 1 秒,然后再次 运行ning
import { delay, flatMap, repeat, takeUntil } from 'rxjs/operators';
import { of,Subject } from 'rxjs';
function doRequest() {
if (Math.random() < 0.25) {
return Observable.of('HTTP Response').delay(3000);
} else {
return Observable.of('HTTP Response');
}
}
const subject: Subject<boolean> = new Subject();
of(null)
.pipe(
// can be used to cancel
takeUntil(subject),
// does request
flatMap(doRequest),
// delays by 1second
delay(1000),
// repeat steps above
repeat(),
)
.subscribe(response => {
console.log(respons);
});
重要的是 takeUntil(..) 是 pipe() 中的最后一次调用。否则 observables 可能不会被取消。参见 https://cartant.medium.com/rxjs-avoiding-takeuntil-leaks-fb5182d047ef