RxJS 的队列运算符

Queue operator for RxJS

RxJS 中是否有一个运算符允许我缓冲项目并在信号可观察触发时将它们一个接一个地放出?有点像 bufferWhen,但不是在每个信号上转储整个缓冲区,而是转储每个信号的特定数量。它甚至可以转储可观察信号发出的数字。

Input observable:  >--a--b--c--d--|
Signal observable: >------1---1-1-|
Count in buffer:   !--1--21-2-121-|
Output observable: >------a---b-c-|

是的,你可以使用zip做你想做的事:

const input = Rx.Observable.from(["a", "b", "c", "d", "e"]);
const signal = new Rx.Subject();
const output = Rx.Observable.zip(input, signal, (i, s) => i);
output.subscribe(value => console.log(value));
signal.next(1);
signal.next(1);
signal.next(1);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

事实上,zipthis GitHub issue 中用作与缓冲相关的示例。

如果您想使用信号的发射值来确定要释放多少缓冲值,您可以这样做:

const input = Rx.Observable.from(["a", "b", "c", "d", "e"]);
const signal = new Rx.Subject();
const output = Rx.Observable.zip(
  input,
  signal.concatMap(count => Rx.Observable.range(0, count)),
  (i, s) => i
);
output.subscribe(value => console.log(value));
signal.next(1);
signal.next(2);
signal.next(1);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

window可以用来分隔时间线。 takeLast 用于保存输出。

    let signal = Rx.Observable.interval(1000).take(4);

    let input = Rx.Observable.interval(300).take(10).share();

    let output = input
        .do(value => console.log(`input = ${value}`))
        .window(signal)
        .do(() => console.log(`*** signal : end OLD and start NEW subObservable`))
        .mergeMap(subObservable => {
            return subObservable.takeLast(100);
        })
        .share()

    output.subscribe(value => console.log(`    output = ${value}`));

    Rx.Observable.merge(input.mapTo(1), output.mapTo(-1))
        .scan((count, diff) => {
            return count + diff;
        }, 0)
        .subscribe(count => console.log(`            count = ${count}`));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

结果:

22:28:37.971 *** signal : end OLD and start NEW subObservable  
22:28:38.289 input = 0  
22:28:38.292             count = 1  
22:28:38.575 input = 1  
22:28:38.576             count = 2  
22:28:38.914 input = 2  
22:28:38.915             count = 3  
            <signal received>
22:28:38.977     output = 0  
22:28:38.979             count = 2  
22:28:38.980     output = 1  
22:28:38.982             count = 1  
22:28:38.984     output = 2  
22:28:38.986             count = 0  
22:28:38.988 *** signal : end OLD and start NEW subObservable  
22:28:39.175 input = 3  
22:28:39.176             count = 1  
22:28:39.475 input = 4  
22:28:39.478             count = 2  
22:28:39.779 input = 5  
22:28:39.780             count = 3  
            <signal received>
22:28:39.984     output = 3  
22:28:39.985             count = 2  
22:28:39.986     output = 4  
22:28:39.988             count = 1  
22:28:39.989     output = 5  
22:28:39.990             count = 0  
22:28:39.992 *** signal : end OLD and start NEW subObservable  
22:28:40.075 input = 6  
22:28:40.077             count = 1  
22:28:40.377 input = 7  
22:28:40.378             count = 2  
22:28:40.678 input = 8  
22:28:40.680             count = 3  
22:28:40.987 input = 9  
22:28:40.990             count = 4  
            <input completed>
22:28:40.992     output = 6  
22:28:40.993             count = 3  
22:28:40.995     output = 7  
22:28:40.996             count = 2  
22:28:40.998     output = 8  
22:28:40.999             count = 1  
22:28:41.006     output = 9  
22:28:41.007             count = 0