RxJs 合并流

RxJs merging streams

我期待合并流并刷新所有订阅者。

出于函数式编程的目的,concat 不会更新当前的可观察对象,我不知道正确的方法是什么。

这是我希望事情发生的方式:

var observable = Rx.Observable.from(['stream1']);
var subscription = observable.subscribe(
    function(x) {
        console.log('Next: ' + x);
    },
    function(err) {
        console.log('Error: ' + err);
    },
    function() {
        console.log('Completed');
    });
observable.concat(Rx.Observable.from(['stream2']));

此代码仅读取第一个流然后完成。 当 concat 时,它会创建一个我并不真正想要的新 Observable,因为我已经订阅了第一个。

我什至无法进入第一个可观察对象,正确的方法是什么?

谢谢!

你想要做的事情并不完全像那样可能,但你基本上有两个选择,都涉及 Subject:

的使用

1) 手动发射数据

const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();

Rx.Observable.merge(obj$, subj$)
    .subscribe(
        x => console.log('Next: ' + x),
        x => console.log('Error: ' + x),
        () => console.log('Complete')
    );

subj$.next("stream2");
subj$.next("stream3");

但是:在这种情况下,永远不会调用 complete,因为 Subject 永远不会自行完成 - 所以如果您需要 complete-处理程序被触发你必须添加一个手册 subj$.complete(); 到最后。


2) 通过主题进行多播

const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();

subj$.subscribe(
    x => console.log('Next: ' + x),
    x => console.log('Error: ' + x),
    () => console.log('Complete')
);

obs$.subscribe(x => subj$.next(x));
const obs2$ = Rx.Observable.of("stream2");
obs2$.subscribe(x => subj$.next(x));

在这种情况下,Subject 基本上将充当 "proxy",它只会传播数据,但不会触发错误或完整触发器。

这两种解决方案都不是真正的 "nice" - 但也许您可以更好地概述您的用例,我相信有一个合适的解决方案,不涉及任何复杂的解决方法。


如果你只是想有一种方法从一个永久的 Observable 中持续提供数据,你应该使用 BehaviorSubject - 它的工作方式是,你可以在它上面发射数据并订阅它同时:

class Service {
    public data$ = new BehaviorSubject(someInitialDataOrNull);

    public getData() {
        makeSomeHttpCall()
            .subscribe(data => data$.next(data));
    }
}

class Component {
    constructor() {
        theService.data$.subscribe(data => console.log(data));
    }
}

这是 BehaviorSubject 旧文档的 link(它基本上仍然以相同的方式工作,期望 onNext 现在是 next,等等。 .)