基于另一个上采样 Rx observable

Upsample an Rx observable based on another

我是响应式扩展的新手,所以请原谅我的无知。我有两个可观察对象 obj1obj2 return 项。在特定时间 windows T(我想是缓冲区)我想对最稀疏的项目序列进行上采样。 请参阅示意图示例。

            Time of returned objects   
        obj1   t1  t2 t3 t4 t5 t6 ...   
        obj2  t1         t2       t3 ...
     os_obj2   t1  t1 t2 t2 t2 t3 ...   
 time_window <-----T-----><-----T----->

在上采样过程中,我们将 obj2 returned 项目放置在相同的位置(时间) obj1 的项目存在。

我们应该在特定 obj1 项目时间复制什么 obj2 项目的选择取决于哪个 obj2 项目更接近特定 obj1 项目时间。

您能否提出实现该目标的方法(可能的功能或功能管道)?

谢谢。

EDIT 我刚刚注意到你想要发射之前没有被任何其他 observable 发射的项目。这是无法实现的,因为它需要预见未来。

如果 obj2 定期发出可预测的项目,那么您可以做点什么。让 obj2 提前半个周期发出这些项目,这样你就可以只使用来自 obj2 的最新项目,而不需要预测未来的项目。

以下示例将在 obj1 的发射时间发射来自 obj2 的最新项目。最简单的是使用 Observable.withLatestFrom.

var obj1 = Rx.Observable.interval(500).map(x => "obj1." + x).take(10);
var obj2 = Rx.Observable.interval(300).map(x => "obj2." + x).take(10);

obj2.withLatestFrom(obj1).pluck(1).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.11/Rx.umd.js"></script>

PS:一开始我以为可以用sample,但是过采样对我没用

可观察流中的每个值都有两个元素:它出现的时间和值。所以让我们稍微改变一下你的原理图:

Timeline  0--1--2--3--4--5--6--7--8...
XObserve  x0-x1-x2-x3-x4-x5-x6-x7-x8...
YObserve  y0-------y1-------y2------...

所以我们有两个 observables,一个每秒发出一个值(为简单起见,假设时间线是秒级),另一个每三秒发出一个值。考虑到所有这些,您是否希望第三个流 ZObserve 使用上采样技术以 X 流的速率为 Y 流生成值,就像这样?

Timeline  0--1--2--3--4--5--6--7--8-...
XObserve  x0-x1-x2-x3-x4-x5-x6-x7-x8...
YObserve  y0-------y1-------y2------...
ZObserve  y0-y0-y1-y1-y1-y2-y2-y2-??...

如您所见,ZObserveT8 的值是 ??,因为我不知道它是什么。我也看不出在 T2T5 时我如何预测 Y 流中的下一个值将是什么。如果有关于它们将是什么的可靠规则,那么我们可以相应地对其进行编码,生成一个新的流 PredictedY,并按照@Tamas 的建议使用 WithLatestFrom 来生成 Z 流。

我建议使用 Operator CombineLatest 并可能在之后使用 Sample