用 LatestFrom 缓冲

buffered withLatestFrom

我需要类似于 withLatestFrom 的东西,对应于下图:

---------A-----------------B--
-1-2-3------4------5-6-7-8----

--------[A,[123]]---------------[B,[45678]]

有什么方法可以用 RxJS 实现这种行为吗?

This 似乎有效。它基于 sample 运算符,但还有其他方法可以做到这一点。

var ta_message = document.getElementById('ta_message');
var ta_result = document.getElementById('ta_result');

function emits ( who, who_ ) {return function ( x ) {
 who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}

function pushValue (arr, el){arr.push(el); return arr;}

var sourceCounter = 0;
var samplerCounter = 0;
var source$ = Rx.Observable
  .fromEvent(document.getElementById('source'), 'click')
  .map(function(){return sourceCounter++;})
  .do(emits(ta_source, 'source'))
  .share();
var sampler$ = Rx.Observable
  .fromEvent(document.getElementById('sampler'), 'click')
  .map(function(){return String.fromCharCode(65+samplerCounter++);})
  .do(emits(ta_sampler, 'sampler'))
  .share();

var bufferedSource$ = sampler$
  .startWith('0')
  .flatMapLatest(function (x){
    console.log("x",x)
    return source$.scan(pushValue,[])
  })  
  .do(emits(ta_sampler, 'bufferedSource$'));

var sampledSource$ = bufferedSource$
  .sample(sampler$)
  .do(emits(ta_result, 'sampledSource'))
  .withLatestFrom(sampler$, 
        function (bufferedValues, samplerValue){
          return "" +samplerValue + bufferedValues;
        })
  .do(emits(ta_result, 'result'))

//sampler$.connect();
sampledSource$.subscribe(function(){});

很抱歉我不是 javascript 编码员,但这适用于 .NET。希望你能翻译。

var xs = new Subject<string>();
var ys = new Subject<string>();

var qs =
    xs.Publish(pxs =>
        ys.Buffer(() => pxs)
            .Zip(pxs, (numbers, letter) => new { letter, numbers }));

.Publish(pxs => 运算符采用单个可观察对象,仅订阅一次并在 lambda 中共享该订阅。它防止对源进行多次订阅,并同步 lambda 中 pxs 的值的生成。

ys.Buffer(() => pxs 获取 ys 的所有值,并将这些值转换为一系列列表,这些列表在 pxs 生成值时被分解。

最后 .Zip(...)pxs 中获取值并将它们与 .Buffer(...) 生成的列表配对。

qs.Subscribe(q => Console.WriteLine(q));

ys.OnNext("1");
ys.OnNext("2");
ys.OnNext("3");
xs.OnNext("A");
ys.OnNext("4");
ys.OnNext("5");
ys.OnNext("6");
ys.OnNext("7");
ys.OnNext("8");
xs.OnNext("B");

ys.OnCompleted();
xs.OnCompleted();

它产生:


这是翻译后的 javascript 版本:

var qs = xs.publish(
    pxs =>
      ys.buffer(() => pxs).zip(pxs, (numbers, letter) => [ letter, numbers ])
);