zip 的替代方案,它在任何可观察对象发出值时产生值

alternative to zip that produces value whenever any of the observable emit a value

目前 zip 只会在所有压缩的 observable 产生一个值时产生一个值。例如。来自文档:

Merges the specified observable sequences or Promises into one observable sequence by using the selector function whenever all of the observable sequences have produced an element

我正在寻找一个可以压缩一个可观察对象但会生成一个压缩可观察对象序列数组的可观察对象,其中是否所有都产生一个值并不重要..

例如假设我有 tick$、observ1、observ2.. tick$ 总是每 x 秒产生一次价值.. 而 observ1 和 observ2 只是不时产生.. 我希望我的流看起来像

[tick, undefined, observ2Res],
[tick, undefined, undefined],
[tick, observ1Res, observ2Res]
...
...

考虑到 combine latest 取给定 observable 的最新值,它没有 combine latest。

我相信 buffer(或者 sample)可能会让您走上正轨。 buffer 方法接受一个用于定义缓冲区边界的 Observable。生成的流发出在 window 中发出的任何项目(从 RXJS 文档中为 buffer 窃取的示例):

var source = Rx.Observable.timer(0, 50)
  .buffer(function () { return Rx.Observable.timer(125); })
  .take(3);

var subscription = source.subscribe(x => console.log('Next: ', x));

// => Next: 0,1,2
// => Next: 3,4,5
// => Next: 6,7

所以我们现在有一种方法可以在特定时间内获取流发出的所有事件 window。在您的情况下,我们可以使用 tick$ 来描述我们的采样周期,而 observ1observ2 是我们要缓冲的基础流:

const buffered1 = observ1.buffer(tick$);
const buffered2 = observ2.buffer(tick$);

这些流中的每一个都将在每个 tick$ 周期发出一次,并将发出来自底层流的所有发出项目的列表(在该时间段内)。 buffered 流将发出这样的数据:

|--[]--[]--[1, 2, 3]--[]-->

为了得到你想要的输出,我们可以选择只查看每个缓冲结果的最新发射项,如果没有发射数据,我们可以传递null:

const buffered1 = observ1.buffer($tick).map(latest);
const buffered2 = observ2.buffer($tick).map(latest);

function latest(x) {
    return x.length === 0 ? null : x[x.length - 1];
}

我之前演示的样本流现在看起来像这样:

|--null--null--3--null-->

最后,我们可以 zip 这两个流在 tick$ 间隔期间获取 "latest" 发出的数据:

const sampled$ = buffered1.zip(buffered2);

sampled$ 流将通过 tick$ window 从我们的 observ1observ2 流发出最新数据。这是一个示例结果:

|--[null, null]--[null, 1]--[1, 2]-->