在 RxJs 中,你如何制作一个 throttled buffer operator?

In RxJs how can you make a throttled buffer operator?

我有以下需求:

我想在 leadingtrailing 都设置为 true 的情况下使用 rxjs 限制函数调用。但是,我不想忽略被跳过的 fn 调用;我想将所有函数调用的输出作为数组包含在油门的开始和结束处。有点像结合 throttlebuffer

举个更好的例子:

我只想发出每 1 秒的点击次数。如果有很长的停顿,当下一次点击发生时,立即发出该点击的 x 和 y(在一个数组中,就好像它被缓冲了一样)。从第一次点击到 1 秒后的所有点击,都将被观察为第二个到达的数组。

这是一些我试过但似乎不起作用的代码,可在 this codesandbox:

上试用
import { pipe, asyncScheduler, interval } from "rxjs";
import { publish, throttleTime, buffer, take } from "rxjs/operators";

const throttledBuffer = (throttleTimeMs) => {
  return pipe(
    // publish this observable so it can be shared
    publish((observable$) => {
      // get invokations, and throttle them
      const throttleCalls$ = observable$.pipe(
        throttleTime(throttleTimeMs, asyncScheduler, {
          leading: true,
          trailing: true
        }),
        
      );

      // buffer observable, emitting an array of accumulated values for each fn call at each observed throttle
      return observable$.pipe(
        buffer(throttleCalls$)
      );
    })
  );
};


const clicks$ = interval(50);


clicks$.pipe(throttledBuffer(100),take(4)).subscribe({
  next: (value) => {
    console.log(value);
  }
});

在上面的代码片段中,我希望得到的结果是:

[0]
[1, 2]
[3]
[4, 5]

然而,事情是这样的:

[]
[0,1,2]
[]
[3,4,5]

好像是在节气门发出声音之前发生了缓冲;或者在它之后跳过它。有人成功地制作了这样的自定义运算符吗?

TLDR;

这里有一个方法:

const throttledBuffer = (throttleTimeMs) => {
  return pipe(
    publish((observable$) => {
      const throttleCalls$ = observable$.pipe(
        // tap((v) => console.log("in throttle", v)),
        throttleTime(throttleTimeMs, asyncScheduler, {
          leading: true,
          trailing: true
        }),
        // tap(() => console.log("after throttle"))
      );

      const src$ = observable$.pipe(share());
      return merge(
        src$.pipe(ignoreElements()),
        src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
      );
    })
  );
};

const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
  next: (value) => {
    console.log("p", value);
  }
});

/* 
console output:

p 
(1) [0]
p 
(2) [1, 2]
p 
(1) [3]
p 
(2) [4, 5]
*/

您可以取消注释 tap 以检查发生了什么。

可以找到工作演示 here


为什么上述解决方案有效以及前一个解决方案出了什么问题

了解 publish 运算符

首先,重要的是要强调 how the publish operator works:

export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
  return selector ? connect(selector) : multicast(new Subject<T>());
}

在这种情况下,我们对 selector 方法很感兴趣。但是,如您所见,如果您只使用 publish(),它将使用 multicastSubject。作为旁注,publishReplay 也使用 multicast,但使用 ReplaySubject 而不是 Subject

connect函数定义as follows:

// `source` - the further-up observable in the chain
// `subscriber` - the newly created subscriber
/* 
To get a better understanding of what `subscriber` and `source` really are, let's consider this quick example

const o$ = new Observable(subscriber => {});
o$.pipe(
  map(...),
).subscribe({ next: v => console.log('hello', v) })

In the `map`'s implementation, when the `map` observable is subscribed, it will call again `source.subscribe(someSubscriber)`.
In that case, `someSubscriber` is a `{ next: v => console.log('hello', v) }`(roughly) and `source` is `o$`.
So, in `subscriber => {}` from `Observable`'s callback, the `subscriber` will actually be `someSubscriber`.
*/

// It is a `Subject` instance
const subject = connector();

from(selector(fromSubscribable(subject))).subscribe(subscriber);
subscriber.add(source.subscribe(subject));

fromSubscribable(subject) 将 return 一个 Observable 实例,当订阅该实例时,订阅者将被添加到 subject 的订阅者列表中。因此,您提供给 publish 的回调函数的参数将是我之前提到的 Observable 实例。所有这一切的要点是要知道在该回调 中创建的每个 订阅者都将成为 subject 订阅者列表的一部分。因此,对于 subscriber.add(source.subscribe(subject))subject 将订阅 source,这意味着来自 source 的所有值都将由 subject 接收使用,这也意味着这些值将发送给该列表中的所有订阅者

因此,要点是您在该回调函数中创建订阅者的 order 很重要。

为什么会出现问题

现在让我们先看看问题出在哪里:

const throttledBuffer = (throttleTimeMs) => {
  return pipe(
    publish((observable$) => {
      const throttleCalls$ = observable$.pipe(
        throttleTime(throttleTimeMs, asyncScheduler, {
          leading: true,
          trailing: true
        })
      );

      return observable$.pipe(buffer(throttleCalls$));
    })
  );
};

const clicks$ = interval(50);

clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
  next: (value) => {
    console.log("p", value);
  }
});

/* 
console output:

p
[]
p 
(3) [0, 1, 2]
p 
[]
p 
(3) [3, 4, 5]
*/

通过 returning observable$.pipe(buffer(throttleCalls$)),本质上与

相同
observable$.pipe(buffer(throttleCalls$)).subscribe(subscriber /* #1 */);

问题终于来了:)。问题是throttleCalls$会先被订阅,因为这背后的概念可以归结为f(g(x)),其中g(x)会先被调用。因为 throttleCalls$ 先订阅,这意味着 subject 实例会将此订阅者注册为列表中的 第一个 。结果,subscriber#1 将是第二个。这意味着当源发出一些东西时,从throttleCalls$'订阅创建的订阅者将是第一个接收值的订阅者,这就是为什么 你会先得到一个空数组
请记住,Subject 将值同步发送给它的订阅者。这意味着在缓冲区发出 [] 之后,第二个订阅者将立即收到该值并且 buffer 的数组将为 [0]。由于一秒还未过去,12 将到达,当第二秒最终过去时,buffer 的数组将为 [0, 1, 2]。然后,再次发出 3,但是接收它的 第一个订阅者 将属于 throttleCalls$,即 buffer的通知程序,因此,我们再次得到一个空数组。

所以,问题是订阅者添加到 Subject 的订阅者列表的顺序不是我们想要的。

为什么该解决方案有效

让我们再次看看 TLDR 部分的解决方案:

const throttledBuffer = (throttleTimeMs) => {
  return pipe(
    publish((observable$) => {
      const throttleCalls$ = observable$.pipe(
        // tap((v) => console.log("in throttle", v)),
        throttleTime(throttleTimeMs, asyncScheduler, {
          leading: true,
          trailing: true
        }),
        // tap(() => console.log("after throttle"))
      );

      const src$ = observable$.pipe(share());
      return merge(
        src$.pipe(ignoreElements()),
        src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
      );
    })
  );
};

const clicks$ = interval(50);
clicks$.pipe(throttledBuffer(100), take(4)).subscribe({
  next: (value) => {
    console.log("p", value);
  }
});

/* 
console output:

p 
(1) [0]
p 
(2) [1, 2]
p 
(1) [3]
p 
(2) [4, 5]
*/

有了上一节的知识点,我们一步一步来了解是怎么回事。我们知道问题出在订单上,所以我们可以解决这个问题。我们可以通过使用 merge 来获得我们想要的顺序:

merge(
  // `ignoreElements()` will ignore `next`, but will let `error`/`complete` notification in
  // By providing this as the first argument, the we can be sure that the first emitted value 
  // will be **first** part of the buffer, and then the `buffer`'s notifier will emit
  src$.pipe(ignoreElements()),

  // Ensuring the `buffer`'s notifier is the **second** in the list
  src$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);

但是,为了确保一切按预期工作,还有一件事要做,那就是

const src$ = observable$.pipe(share());

share() 所做的是在数据消费者(2 个 merge 参数)和数据生产者(observable$)之间添加一个 Subject 实例。

我们需要这个的原因最好借助这张小图来解释:

                    s1 (the `observable$` argument)
                  /  \
                s2    throttleCalls$
                /  \
(1)ignoreElements    (2)buffer(throttleCalls$)

s2 - the Subject we got from `share()`

所以,当s1收到一个值时,s2会先收到,然后是(1),然后是(2),最后是throttleCalls$。这就是我们想要的,因为我们首先添加到缓冲区,然后 buffer 的通知程序(即 throttleCalls$)发出。

现在您可能想知道,为什么它不能像这样工作?

// const src$ = observable$.pipe(share());
return merge(
  observable$.pipe(ignoreElements()),
  observable$.pipe(/* tap(console.log), */ buffer(throttleCalls$))
);

上述片段的图表如下所示:

               s1 (the `observable$` argument)
              /  \
(1)ignoreElements  (2)buffer(throttleCalls$)

好吧,我们又回到了最初遇到的同样问题,throttleCalls$ 先订阅,因为 (1)(2) 完全独立.订阅者列表(大致)如下所示:

[
  ignoreElements,
  throttleCalls$,
  (2) - the `buffer`'s source
]