RxJS 每秒拨打电话不超过一次,但不要丢失任何电话

RxJS Make call no more than once per second, but don't lose any calls

基本上我想创建一个队列。像

const queue = new BehaviorSubject([])
queue.subscribe((args) => someCall(args))

我可能打电话的地方

queue.next({arg1, arg2, arg3})

在几个地方,有时很快一个接一个。我不能使用 throttledebounce 因为我不能丢失中间调用。我需要每次调用都被调用,但每秒不超过 1 个。如果两个人要在一秒钟之内开火,那么一个人就必须等待 1 秒钟。如果一秒内有3个开火,一个会等一秒,另一个会等2秒。

您可以使用 completion of an observable in combination with combineAll.

combineAll 将发出下一个可观察对象,当前一个已完成时

1.创建您的主题

const source$$ = new Subject();

2。提供一个将您的值映射到 1000 毫秒后完成的 Observable 的函数

const timeCompletedSource$ = (time) => (value) => Observable.create(observer => observer.next(v)).pipe(
  takeUntil(timer(time))
);

你不需要让函数时间动态化,我只是做了 (time) => (value) => ... 因为我想写一个像 throttle(1000) 这样的运算符有一个动态的时间范围。你可以只写 (value) => Observable... 如果你想要一个静态时间

3。使用您的函数将您的值映射到时间盒内的可观察对象,并将所有可观察对象合并到 concatAll

const result$ = source$$.pipe(
  map(timeCompletedSource$(time))
  concatAll()
);

您找到了 运行 stackblitz here

专业版:制作自定义运算符

const fastThrottle = (time) => (source) => source.pipe(
  map(timeCompletedSource$(1000)),
  concatAll()
)

const result$ = source$$.pipe(
  fastThrottle(1000)
);

我最近发现自己处于同样的情况。 api 我正在消费的每秒只能接受 4 个请求。

这是我想出来的。

一个 rateLimit 管道

import { asyncScheduler, BehaviorSubject, timer, MonoTypeOperatorFunction, Observable } from 'rxjs'
import { filter, map, mergeMap, take } from 'rxjs/operators'

export function rateLimit<T>(
  count: number,
  slidingWindowTime: number,
  scheduler = asyncScheduler,
): MonoTypeOperatorFunction<T> {
  let tokens = count
  const tokenChanged = new BehaviorSubject(tokens)
  const consumeToken = () => tokenChanged.next(--tokens)
  const renewToken = () => tokenChanged.next(++tokens)
  const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))

  return mergeMap<T, Observable<T>>((value: T) =>
    availableTokens.pipe(
      take(1),
      map(() => {
        consumeToken()
        timer(slidingWindowTime, scheduler).subscribe(renewToken)
        return value
      }),
    ),
  )
}

而且你可以像这样使用它。 我想从 api 中获取 contractIds$ 中的所有合同。 我只想每 1000 毫秒发送 4 个请求

const contracts$ = contractIds$.pipe(
  rateLimit(4, 1000),
  mergeMap(contract => this.get(contract.DocumentNumber)),
)

也许这会对您有所帮助:)