RxJs 合并流
RxJs merging streams
我期待合并流并刷新所有订阅者。
出于函数式编程的目的,concat 不会更新当前的可观察对象,我不知道正确的方法是什么。
这是我希望事情发生的方式:
var observable = Rx.Observable.from(['stream1']);
var subscription = observable.subscribe(
function(x) {
console.log('Next: ' + x);
},
function(err) {
console.log('Error: ' + err);
},
function() {
console.log('Completed');
});
observable.concat(Rx.Observable.from(['stream2']));
此代码仅读取第一个流然后完成。
当 concat 时,它会创建一个我并不真正想要的新 Observable,因为我已经订阅了第一个。
我什至无法进入第一个可观察对象,正确的方法是什么?
谢谢!
你想要做的事情并不完全像那样可能,但你基本上有两个选择,都涉及 Subject
:
的使用
1) 手动发射数据
const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();
Rx.Observable.merge(obj$, subj$)
.subscribe(
x => console.log('Next: ' + x),
x => console.log('Error: ' + x),
() => console.log('Complete')
);
subj$.next("stream2");
subj$.next("stream3");
但是:在这种情况下,永远不会调用 complete,因为 Subject
永远不会自行完成 - 所以如果您需要 complete
-处理程序被触发你必须添加一个手册 subj$.complete();
到最后。
2) 通过主题进行多播
const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();
subj$.subscribe(
x => console.log('Next: ' + x),
x => console.log('Error: ' + x),
() => console.log('Complete')
);
obs$.subscribe(x => subj$.next(x));
const obs2$ = Rx.Observable.of("stream2");
obs2$.subscribe(x => subj$.next(x));
在这种情况下,Subject
基本上将充当 "proxy",它只会传播数据,但不会触发错误或完整触发器。
这两种解决方案都不是真正的 "nice" - 但也许您可以更好地概述您的用例,我相信有一个合适的解决方案,不涉及任何复杂的解决方法。
如果你只是想有一种方法从一个永久的 Observable 中持续提供数据,你应该使用 BehaviorSubject
- 它的工作方式是,你可以在它上面发射数据并订阅它同时:
class Service {
public data$ = new BehaviorSubject(someInitialDataOrNull);
public getData() {
makeSomeHttpCall()
.subscribe(data => data$.next(data));
}
}
class Component {
constructor() {
theService.data$.subscribe(data => console.log(data));
}
}
这是 BehaviorSubject
旧文档的 link(它基本上仍然以相同的方式工作,期望 onNext
现在是 next
,等等。 .)
我期待合并流并刷新所有订阅者。
出于函数式编程的目的,concat 不会更新当前的可观察对象,我不知道正确的方法是什么。
这是我希望事情发生的方式:
var observable = Rx.Observable.from(['stream1']);
var subscription = observable.subscribe(
function(x) {
console.log('Next: ' + x);
},
function(err) {
console.log('Error: ' + err);
},
function() {
console.log('Completed');
});
observable.concat(Rx.Observable.from(['stream2']));
此代码仅读取第一个流然后完成。 当 concat 时,它会创建一个我并不真正想要的新 Observable,因为我已经订阅了第一个。
我什至无法进入第一个可观察对象,正确的方法是什么?
谢谢!
你想要做的事情并不完全像那样可能,但你基本上有两个选择,都涉及 Subject
:
1) 手动发射数据
const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();
Rx.Observable.merge(obj$, subj$)
.subscribe(
x => console.log('Next: ' + x),
x => console.log('Error: ' + x),
() => console.log('Complete')
);
subj$.next("stream2");
subj$.next("stream3");
但是:在这种情况下,永远不会调用 complete,因为 Subject
永远不会自行完成 - 所以如果您需要 complete
-处理程序被触发你必须添加一个手册 subj$.complete();
到最后。
2) 通过主题进行多播
const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();
subj$.subscribe(
x => console.log('Next: ' + x),
x => console.log('Error: ' + x),
() => console.log('Complete')
);
obs$.subscribe(x => subj$.next(x));
const obs2$ = Rx.Observable.of("stream2");
obs2$.subscribe(x => subj$.next(x));
在这种情况下,Subject
基本上将充当 "proxy",它只会传播数据,但不会触发错误或完整触发器。
这两种解决方案都不是真正的 "nice" - 但也许您可以更好地概述您的用例,我相信有一个合适的解决方案,不涉及任何复杂的解决方法。
如果你只是想有一种方法从一个永久的 Observable 中持续提供数据,你应该使用 BehaviorSubject
- 它的工作方式是,你可以在它上面发射数据并订阅它同时:
class Service {
public data$ = new BehaviorSubject(someInitialDataOrNull);
public getData() {
makeSomeHttpCall()
.subscribe(data => data$.next(data));
}
}
class Component {
constructor() {
theService.data$.subscribe(data => console.log(data));
}
}
这是 BehaviorSubject
旧文档的 link(它基本上仍然以相同的方式工作,期望 onNext
现在是 next
,等等。 .)