处方药。结合最新和一次

RxJs. Combining latest and once

我有一个 UI 这样的:

哪里可以

所需的逻辑是:

我正在尝试用 RxJs 实现这样的逻辑。 我有 filteredUsers$addedUsers$ 流生成匹配过滤器的用户并相应地拖动用户。

我需要这样组合它们:

        Observable
            .<OPERATOR>(filteredUsers$, addedUsers$)
            .subscribe(([filteredUsers, addedUsers]) => {
                // When filteredUsers$ fires:
                //   filteredUsers is value from stream
                //   addedUsers == null

                // When addedUsers$ fires:
                //   filteredUsers is latest available value
                //   addedUsers is value from stream

                redrawChart(/* combining users */)
            });

有什么办法可以实现吗?

时序:

Filtered: -  a  -  -  -  -  a  -  ->
Added   : -  -  b  -  b  -  -  -  ->
Result  : -  a  ab -  ab -  a  -  ->

您有 combineLatest 运算符,它基本上可以执行您所描述的操作。它结合了两个可观察值并为您提供两个流的最新值。

所以:

--a--b-----c--- -x-----d-----p- -结合最新- --a--b-b---c-c x x d d p

如果我理解正确的话,这应该可以让你做你想做的事。

这是官方文档link: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/combinelatest.md

最终我通过添加额外的主题来完成:

var filteredUsers$ = ... // users from filter
var addedUsers$ = ... // users dragged on chart
var usersToDraw$ = new Subject();

订阅:

usersToDraw$
    .subscribe(usersToDraw => {
        redrawChart(usersToDraw);
    });

filteredUsers$
    .subscribe(filteredUsers => {
        usersToDraw$.next(filteredUsers);
    });

Observable
    .combineLatest(filteredUsers$, addedUsers$)
    .filter(([filteredUsers, addedUsers]) => addedUsers != null)
    .subscribe(([filteredUsers, addedUsers]) => {
        // we 'clear' stream so the same users won't be added twice 
        addedUsers$.next(null);

        usersToDraw$.next(/* combining users */);
    });

更新

可以使用 withLatestFrom 改进解决方案(感谢@nova)

usersToDraw$
    .subscribe(usersToDraw => {
        redrawChart(usersToDraw);
    });

filteredUsers$
    .subscribe(filteredUsers => {
        usersToDraw$.next(filteredUsers);
    });

addedUsers$
    .withLatestFrom(filteredUsers$)
    .subscribe(([addedUsers, filteredUsers]) => {
        usersToDraw$.next(/* combining users */);
    });

如果您希望仅在 addUsers$ 触发时填充最终流,而最新的可能是一个解决方案:

因此,在您的情况下,addUsers$ 可能是第一个流。 您可以试试下面的代码:

 let firstObservable$ = Observable.from([1, 2, 3, 4])
    .zip(Observable.interval(50), (a, b) => {
      return a;
    });

  let secondObservable$ = Observable.from([5, 6])
    .zip(
      Observable.interval(70), (a, b) => {
        return a;
      });

  firstObservable$
    .withLatestFrom(secondObservable$, (f, s) => ({ a: f, b: s }))
    .subscribe(x => {
      console.log('result: ', x);
    });

第一个可观察对象每 50 毫秒从数组​​中发出一个值。 每 75 毫秒第二次可观察到。

打印的值为{a: 2, b: 5} {a: 3, b: 6} {a: 4, b: 6}

因为 1 在 5 之前发出,我们失去了对 (1,5)!

我不清楚,但如果另一个流未发出,则 addUsers$ 中缺少一对可能是您不希望的行为。 如果您使用初始值启动第二个流,然后过滤掉任何您不想要的结果,您就可以克服这个问题。