Rxjs angular 6/7 mergeMap 延迟 http 请求
Rxjs angular 6/7 mergeMap delay http request
我想使用此代码发送请求(我也尝试过 forkJoin),但调用之间有延迟:
duplicateElement(id: string): Observable<any> {
return this.http.get({ routeName: 'route_name', params: { id } });
}
duplicateElements(ids: string[]): Observable<any> {
return from(ids)
.pipe(
mergeMap(id => this.duplicateElement(id).pipe(delay(1000))
));
}
但是 .pipe(delay(1000) 没有按照我的预期工作:在 1000 mls 之后发送每个 http 请求。
现在有两个选择!
基本设置
import * as rx from "rxjs";
import * as rxop from "rxjs/operators";
const startTime = new Date();
function getTimestamp() {
return (new Date().getTime() - startTime.getTime()) / 1000;
}
const desiredDelay = 750;
const serviceDelay = 500;
// simulating service, you can ignore what's inside
var myService = (b: any) => {
return rx.of(Math.random()).pipe(
// To simulate long running service
rxop.delay(serviceDelay),
// Log the timestap after execution, should be ~ desiredDelay + serviceDelay, so by default 1250ms each emitted value
rxop.tap(() => console.log(`${b} after service result, ${getTimestamp()}`)),
// simulating the result
rxop.map(a => "result" + b)
);
};
延迟后一个一个发出值,尽快执行服务并收集结果
of([1, 2, 3, 4, 5])
.pipe(
// See the initial values
tap(console.log),
// Split array into single values during emit
// Collect observables and subscribe to next when previous completes
concatAll(),
// Emit each value as a sequence of observables with a desired delay
concatMap(a => of(a).pipe(delay(desiredDelay))),
// Call service on each value as soon as possible, do not care about the order
mergeMap(a => myService(a)),
// Reduce single values back into array
// Reduces the values from source observable to a single value that's emitted when the source completes
reduce<any>((acc, val) => [...acc, val], []),
// See the result, not necessary
tap(console.log)
)
.subscribe();
根据一个值调用服务,等待延迟,然后用另一个值调用服务
of([1, 2, 3, 4, 5])
.pipe(
// See the initial values
tap(console.log),
// Split array into single values during emit
// Collect observables and subscribe to next when previous completes
concatAll(),
// Call the service
// Map values to inner observable, subscribe and emit in order
concatMap(a => myService(a).pipe(delay(desiredDelay))),
// Reduce single values back into array
// Reduces the values from source observable to a single value that's emitted when the source completes
reduce<any>((acc, val) => [...acc, val], []),
// See the result, not necessary
tap(console.log)
)
.subscribe();
这个怎么样:
duplicateElements(ids: string[]): Observable<any> {
return interval(1000).pipe( // start emitting every 1000ms
take(ids.length), // limit emissions to length of array
map(i => ids[i]), // map (change) emission to the array item @ index i
mergeMap(id => this.duplicateElement(id)) // add the http request
)
}
我想使用此代码发送请求(我也尝试过 forkJoin),但调用之间有延迟:
duplicateElement(id: string): Observable<any> {
return this.http.get({ routeName: 'route_name', params: { id } });
}
duplicateElements(ids: string[]): Observable<any> {
return from(ids)
.pipe(
mergeMap(id => this.duplicateElement(id).pipe(delay(1000))
));
}
但是 .pipe(delay(1000) 没有按照我的预期工作:在 1000 mls 之后发送每个 http 请求。
现在有两个选择!
基本设置
import * as rx from "rxjs";
import * as rxop from "rxjs/operators";
const startTime = new Date();
function getTimestamp() {
return (new Date().getTime() - startTime.getTime()) / 1000;
}
const desiredDelay = 750;
const serviceDelay = 500;
// simulating service, you can ignore what's inside
var myService = (b: any) => {
return rx.of(Math.random()).pipe(
// To simulate long running service
rxop.delay(serviceDelay),
// Log the timestap after execution, should be ~ desiredDelay + serviceDelay, so by default 1250ms each emitted value
rxop.tap(() => console.log(`${b} after service result, ${getTimestamp()}`)),
// simulating the result
rxop.map(a => "result" + b)
);
};
延迟后一个一个发出值,尽快执行服务并收集结果
of([1, 2, 3, 4, 5])
.pipe(
// See the initial values
tap(console.log),
// Split array into single values during emit
// Collect observables and subscribe to next when previous completes
concatAll(),
// Emit each value as a sequence of observables with a desired delay
concatMap(a => of(a).pipe(delay(desiredDelay))),
// Call service on each value as soon as possible, do not care about the order
mergeMap(a => myService(a)),
// Reduce single values back into array
// Reduces the values from source observable to a single value that's emitted when the source completes
reduce<any>((acc, val) => [...acc, val], []),
// See the result, not necessary
tap(console.log)
)
.subscribe();
根据一个值调用服务,等待延迟,然后用另一个值调用服务
of([1, 2, 3, 4, 5])
.pipe(
// See the initial values
tap(console.log),
// Split array into single values during emit
// Collect observables and subscribe to next when previous completes
concatAll(),
// Call the service
// Map values to inner observable, subscribe and emit in order
concatMap(a => myService(a).pipe(delay(desiredDelay))),
// Reduce single values back into array
// Reduces the values from source observable to a single value that's emitted when the source completes
reduce<any>((acc, val) => [...acc, val], []),
// See the result, not necessary
tap(console.log)
)
.subscribe();
这个怎么样:
duplicateElements(ids: string[]): Observable<any> {
return interval(1000).pipe( // start emitting every 1000ms
take(ids.length), // limit emissions to length of array
map(i => ids[i]), // map (change) emission to the array item @ index i
mergeMap(id => this.duplicateElement(id)) // add the http request
)
}