使用事件驱动超时清除 Rx.Observable bufferCount?

Clear an Rx.Observable bufferCount with an event driven Timeout?

我正在使用 rxjs 5.0:

如何在此缓冲区上设置超时。这样它会在 5 秒内没有 keyup 事件发生时清除 bufferCount (11)?

var keys = Rx.Observable.fromEvent(document, 'keyup');
var buffered = keys.bufferCount(11,1);
buffered.subscribe(x => console.log(x));

您可以附加一个 timeoutWith,它可以 return 一个 fresh buffered 在一定的超时后(在您的情况下为 5 秒)。

const keys$ = Rx.Observable.fromEvent(document, "keyup")
  .map(ev => ev.keyCode|| ev.which); // this is just to have a readable output here in the SO-console
  
const buffered$ = keys$
  .bufferCount(3,1)  // replaced your 11 with 3 for easy demonstration
  .timeoutWith(2000, Rx.Observable.defer(() => { // replaced 5 with 2 seconds (easier to test here)
     console.log("New Buffer!");
     return buffered$;
  }));

buffered$.subscribe(console.log);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>


作为一项改进,这甚至可以增强为仅在 第一个 笔划上启动流,否则我们将有一个常量超时 运行(不重要,但仍然可以避免)。

const keys$ = Rx.Observable.fromEvent(document, "keyup")
  .map(ev => ev.keyCode|| ev.which); // this is just to have a readable output here in the SO-console

const buffered$ = keys$
  .take(1)
  .switchMap(firstKey => {
    console.log("New Buffer!");
    return keys$
     .startWith(firstKey)
     .bufferCount(3,1)  // replaced your 11 with 3 for easy demonstration
     .timeoutWith(2000, Rx.Observable.defer(() => buffered$)); // replaced 5 with 2 seconds (easier to test here)
  });

buffered$.subscribe(console.log);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

我有另一个(可能更容易理解)解决方案,使用 windowswitchMap():

var keys = Rx.Observable.fromEvent(document.getElementById('myinput'), 'keyup')
  .map(event => event.keyCode)
  .share();

var buffered = keys
  .window(keys.debounceTime(5000))
  .switchMap(observable => observable.bufferCount(5, 1))
  .filter(buffer => buffer.length === 5);

buffered.subscribe(x => console.log(x));

查看演示:https://jsbin.com/cakoru/17/edit?js,console,output

当你至少 5 秒不输入时,window() 运算符会创建一个新的 Observable,它在 switchMap() 内部订阅并与新的 .bufferCount() 运算符链接。

这是我的做法:

const keys$ = Rx.Observable.fromEvent(document, 'keyup').map(ev => ev.keyCode|| ev.which);

keys$
  .debounceTime(5000)
  .startWith({})
  .switchMap(x => keys$.bufferCount(11, 1))
  .subscribe(x => console.log(x));

这里我们有一个流,每次输入停止时产生一个值(以虚拟值开始),switchMaps 到 bufferCount。