RxJS:(时间)缓冲区,在下一次发射后开始

RxJS: (Time) Buffer that starts after next emittion

我想知道如何使用 RxJs (4/5) 正确地实现它?

-a-- -b----c----d-----------------------------------------------------------e------f---------------------
-5-sec after-"a"--> [abcd]---new 5 sec timer will start when "e" emited-----5 sec-after-"e"->[ef]-

我认为:

.buffer(source$.throttleTime(5000).debounceTime(5000))

在 rxjs 5 中完成这项工作

最好的方法是使用缓冲区。缓冲区有一个关闭条件,您希望在引入新项目 5 秒后关闭条件。因此,假设您有一个源流,您想要的流将是:

source.buffer(source.throttle(5100).debounce(5000));

这是 rxjs 4。我认为 rxjs 的缓冲区运算符略有不同,但想法是一样的。

说明: 节流阀确保在 5100 毫秒内您只会获得第一个 "tick"。去抖将在 5000 毫秒后传播此 "tick",因为此后没有其他 "ticks"。请注意,我选择了 5100 毫秒,因为时间并不总是完美的,如果您对两者都使用 5000 毫秒,去抖动可能会反复延迟,您会感到饥饿。无论如何,您的缓冲区不会丢失数据,只是可以将其分组为大于 5000 毫秒的块。

Rxjs 5 有一个 bufferToggle 运算符,它可能看起来是一个更好的选择,但是,您同时打开和关闭缓冲区的事实可能会变得有风险,并且由于时序问题会使您丢失数据。

尝试了所有 Rxjs 5 缓冲区变体,特别是每 n 秒发出一次是否为空的 bufferTime,我最终推出了自己的 bufferTimeLazy:

function bufferTimeLazy(timeout) {
  return Rx.Observable.create(subscriber => {
    let buffer = [], hdl;
    return this.subscribe(res => {
      buffer.push(res);
      if (hdl) return;

      hdl = setTimeout(() => {
        subscriber.next(buffer);
        buffer = [];
        hdl = null;
      }, timeout);

    }, err => subscriber.error(err), () => subscriber.complete());
  });
};

// add operator
Rx.Observable.prototype.bufferTimeLazy = bufferTimeLazy;

// example
const click$ = Rx.Observable.fromEvent(document, 'click');

click$.bufferTimeLazy(5000).subscribe(events => {
  console.log(`received ${events.length} events`);
});

示例: https://jsbin.com/nizidat/6/edit?js,console,output

想法是在缓冲区中收集事件并在第一个事件后 n 秒发出缓冲区。一旦发出,清空缓冲区并保持休眠状态,直到下一个事件到达。

如果您不想向 Observable.prototype 添加运算符,只需调用函数:

bufferTimeLazy.bind(source$)(5000)

编辑: 好的,所以 Rxjs 5 还不错:

var clicks = Rx.Observable.fromEvent(document, 'click').share();
var buffered = clicks.bufferWhen(() => clicks.delay(5000));
buffered.subscribe(x => console.log(`got ${x.length} events`));

实现相同。注意 share() 以避免重复点击订阅 - YMMV。

我正在使用 RxJS 6,无法轻易找到 5 的文档。但是,这是一个很好的问题。这是我的结果,它也在 real example 重现 Angular Material.

中的错误中得到证明
source$ = source$.pipe(buffer(source$.pipe(debounceTime(5000))));

正如 Trevor 提到的,在 RXJS 6 中没有正式的方法,但显然你需要使用 debounce + buffer 才能达到那个结果。

为了使事情正常进行,在 Typescript 和类型推断中,我创建了一个名为 bufferDebounce 的自定义 OperatorFunction这个运算符更容易使用和理解。

带有类型推断的片段

type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;

const bufferDebounce: BufferDebounce = debounce => source =>
  new Observable(observer => 
    source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
      next(x) {
        observer.next(x);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      },
  })
// [as many sources until no emit during 500ms]
source.pipe(bufferDebounce(500)).subscribe(console.log) 

您可以在这个工作示例中尝试:https://stackblitz.com/edit/rxjs6-buffer-debounce