RxJS 每秒拨打电话不超过一次,但不要丢失任何电话
RxJS Make call no more than once per second, but don't lose any calls
基本上我想创建一个队列。像
const queue = new BehaviorSubject([])
queue.subscribe((args) => someCall(args))
我可能打电话的地方
queue.next({arg1, arg2, arg3})
在几个地方,有时很快一个接一个。我不能使用 throttle
或 debounce
因为我不能丢失中间调用。我需要每次调用都被调用,但每秒不超过 1 个。如果两个人要在一秒钟之内开火,那么一个人就必须等待 1 秒钟。如果一秒内有3个开火,一个会等一秒,另一个会等2秒。
您可以使用 completion of an observable in combination with combineAll.
combineAll 将发出下一个可观察对象,当前一个已完成时
1.创建您的主题
const source$$ = new Subject();
2。提供一个将您的值映射到 1000 毫秒后完成的 Observable 的函数
const timeCompletedSource$ = (time) => (value) => Observable.create(observer => observer.next(v)).pipe(
takeUntil(timer(time))
);
你不需要让函数时间动态化,我只是做了 (time) => (value) => ... 因为我想写一个像 throttle(1000) 这样的运算符有一个动态的时间范围。你可以只写 (value) => Observable... 如果你想要一个静态时间
3。使用您的函数将您的值映射到时间盒内的可观察对象,并将所有可观察对象合并到 concatAll
中
const result$ = source$$.pipe(
map(timeCompletedSource$(time))
concatAll()
);
您找到了 运行 stackblitz here
专业版:制作自定义运算符
const fastThrottle = (time) => (source) => source.pipe(
map(timeCompletedSource$(1000)),
concatAll()
)
const result$ = source$$.pipe(
fastThrottle(1000)
);
我最近发现自己处于同样的情况。
api 我正在消费的每秒只能接受 4 个请求。
这是我想出来的。
一个 rateLimit 管道
import { asyncScheduler, BehaviorSubject, timer, MonoTypeOperatorFunction, Observable } from 'rxjs'
import { filter, map, mergeMap, take } from 'rxjs/operators'
export function rateLimit<T>(
count: number,
slidingWindowTime: number,
scheduler = asyncScheduler,
): MonoTypeOperatorFunction<T> {
let tokens = count
const tokenChanged = new BehaviorSubject(tokens)
const consumeToken = () => tokenChanged.next(--tokens)
const renewToken = () => tokenChanged.next(++tokens)
const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))
return mergeMap<T, Observable<T>>((value: T) =>
availableTokens.pipe(
take(1),
map(() => {
consumeToken()
timer(slidingWindowTime, scheduler).subscribe(renewToken)
return value
}),
),
)
}
而且你可以像这样使用它。
我想从 api 中获取 contractIds$ 中的所有合同。
我只想每 1000 毫秒发送 4 个请求
const contracts$ = contractIds$.pipe(
rateLimit(4, 1000),
mergeMap(contract => this.get(contract.DocumentNumber)),
)
也许这会对您有所帮助:)
基本上我想创建一个队列。像
const queue = new BehaviorSubject([])
queue.subscribe((args) => someCall(args))
我可能打电话的地方
queue.next({arg1, arg2, arg3})
在几个地方,有时很快一个接一个。我不能使用 throttle
或 debounce
因为我不能丢失中间调用。我需要每次调用都被调用,但每秒不超过 1 个。如果两个人要在一秒钟之内开火,那么一个人就必须等待 1 秒钟。如果一秒内有3个开火,一个会等一秒,另一个会等2秒。
您可以使用 completion of an observable in combination with combineAll.
combineAll 将发出下一个可观察对象,当前一个已完成时
1.创建您的主题
const source$$ = new Subject();
2。提供一个将您的值映射到 1000 毫秒后完成的 Observable 的函数
const timeCompletedSource$ = (time) => (value) => Observable.create(observer => observer.next(v)).pipe(
takeUntil(timer(time))
);
你不需要让函数时间动态化,我只是做了 (time) => (value) => ... 因为我想写一个像 throttle(1000) 这样的运算符有一个动态的时间范围。你可以只写 (value) => Observable... 如果你想要一个静态时间
3。使用您的函数将您的值映射到时间盒内的可观察对象,并将所有可观察对象合并到 concatAll
中const result$ = source$$.pipe(
map(timeCompletedSource$(time))
concatAll()
);
您找到了 运行 stackblitz here
专业版:制作自定义运算符
const fastThrottle = (time) => (source) => source.pipe(
map(timeCompletedSource$(1000)),
concatAll()
)
const result$ = source$$.pipe(
fastThrottle(1000)
);
我最近发现自己处于同样的情况。 api 我正在消费的每秒只能接受 4 个请求。
这是我想出来的。
一个 rateLimit 管道
import { asyncScheduler, BehaviorSubject, timer, MonoTypeOperatorFunction, Observable } from 'rxjs'
import { filter, map, mergeMap, take } from 'rxjs/operators'
export function rateLimit<T>(
count: number,
slidingWindowTime: number,
scheduler = asyncScheduler,
): MonoTypeOperatorFunction<T> {
let tokens = count
const tokenChanged = new BehaviorSubject(tokens)
const consumeToken = () => tokenChanged.next(--tokens)
const renewToken = () => tokenChanged.next(++tokens)
const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))
return mergeMap<T, Observable<T>>((value: T) =>
availableTokens.pipe(
take(1),
map(() => {
consumeToken()
timer(slidingWindowTime, scheduler).subscribe(renewToken)
return value
}),
),
)
}
而且你可以像这样使用它。 我想从 api 中获取 contractIds$ 中的所有合同。 我只想每 1000 毫秒发送 4 个请求
const contracts$ = contractIds$.pipe(
rateLimit(4, 1000),
mergeMap(contract => this.get(contract.DocumentNumber)),
)
也许这会对您有所帮助:)