Rxjs - 订阅相互依赖的可观察对象
Rxjs - Subscribing to interdependent observables
我正在学习 angular 2 和 rxjs。我有 3 个变量,A B C.
- B 取决于 A 的值
- C 取决于 A 和 B 的值
我正在尝试设置可观察对象,以便:当 A 更新时,B 和 C 将自动更新。当 B 更新时,C 将自动更新。我尝试了两种设置,但都不尽如人意。
- 第一个设置:B 订阅可观察的 A; C 订阅可观察的 B withlatestfrom A。A 中的变化确实向下级联到 B 然后到 C,但来自 A 的值不是最新的。
- 第二种设置:B 订阅可观察的 A; C 订阅了 A 和 B 的 combineLatest observable。此设置有效,但我获得了 C 的两个更新,首先来自 B,然后来自 A。
我如何设置我的可观察对象/订阅,以便在更新 A 时,C 只会使用 A 和 B 的最新值更新一次?
编辑 - 添加代码
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();
A_Observable.subscribe(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
B.next(newB);
});
// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable);
combineAllObservable.subscribe(function(data) {
console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
// SATISFACTORY RESULT
setTimeout(function(){
console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
代码之所以如此,是因为默认情况下 RxJs 运行s 代码是同步的。 withLatestFrom
案例的事件链:
- 您推送新的
A
值:A.next(2);
- RxjS看到
A
有两种用法:
B
-订阅(先到先得)
withLatestFrom
(第二)
- RxJs 调用
B
-订阅
- 它推送
B
的新值
- RxjS看到有
B
的用法:C
-subscription
- RxJs 调用
C
-订阅(旧值为 A
!)
A
的新值被拉入 withLatestFrom
但 C
- 订阅未被触发(因为它仅由 B
更新触发)
可能的解决方案是添加debounceTime(0)
。当withLatestFrom
中的值已经更新时,它会强制C
-订阅运行异步。
有关 RxJs Scheduler
的更多信息 google。
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();
A_Observable.subscribe(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
B.next(newB);
});
// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.debounceTime(0).withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable).debounceTime(0);
combineAllObservable.subscribe(function(data) {
console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
// SATISFACTORY RESULT
setTimeout(function(){
console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
我想补充两点:
withLatestFrom
方法在您的情况下更好,因为依赖图看起来像 C -> B -> A
并且当 A 更新时 B 也会更新。如果 A
和 B
是独立的,那么 combineLatest
将是更好的选择。
Subject
B
不需要。你可以像这样重写代码(没有debounceTime
!)
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B_Observable = A_Observable.map(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
return newB;
});
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
好的,所以我用你的代码进行了测试。调整以改变一些事情。您可以在此处查看代码:https://jsbin.com/zonedirogi/edit?html,js,console,output。主要的变化实际上是用 A 的最新版本代替 B,而是从 B 的最新版本更改为 A。然后它按预期工作。
仅供参考,您不需要使用 asObservable()
将 subject 转换为 observable。
var a = new Rx.BehaviorSubject(1);
var b = new Rx.BehaviorSubject(10);
a.subscribe(x => {
b.next(x * 10);
});
a.withLatestFrom(b).subscribe(x => {
console.log(x)
})
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
a.next(2);
}, 1000);
我正在学习 angular 2 和 rxjs。我有 3 个变量,A B C.
- B 取决于 A 的值
- C 取决于 A 和 B 的值
我正在尝试设置可观察对象,以便:当 A 更新时,B 和 C 将自动更新。当 B 更新时,C 将自动更新。我尝试了两种设置,但都不尽如人意。
- 第一个设置:B 订阅可观察的 A; C 订阅可观察的 B withlatestfrom A。A 中的变化确实向下级联到 B 然后到 C,但来自 A 的值不是最新的。
- 第二种设置:B 订阅可观察的 A; C 订阅了 A 和 B 的 combineLatest observable。此设置有效,但我获得了 C 的两个更新,首先来自 B,然后来自 A。
我如何设置我的可观察对象/订阅,以便在更新 A 时,C 只会使用 A 和 B 的最新值更新一次?
编辑 - 添加代码
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();
A_Observable.subscribe(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
B.next(newB);
});
// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable);
combineAllObservable.subscribe(function(data) {
console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
// SATISFACTORY RESULT
setTimeout(function(){
console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
代码之所以如此,是因为默认情况下 RxJs 运行s 代码是同步的。 withLatestFrom
案例的事件链:
- 您推送新的
A
值:A.next(2);
- RxjS看到
A
有两种用法:B
-订阅(先到先得)withLatestFrom
(第二)
- RxJs 调用
B
-订阅- 它推送
B
的新值
- RxjS看到有
B
的用法:C
-subscription - RxJs 调用
C
-订阅(旧值为A
!)
- 它推送
A
的新值被拉入withLatestFrom
但C
- 订阅未被触发(因为它仅由B
更新触发)
可能的解决方案是添加debounceTime(0)
。当withLatestFrom
中的值已经更新时,它会强制C
-订阅运行异步。
有关 RxJs Scheduler
的更多信息 google。
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();
A_Observable.subscribe(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
B.next(newB);
});
// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.debounceTime(0).withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable).debounceTime(0);
combineAllObservable.subscribe(function(data) {
console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
// SATISFACTORY RESULT
setTimeout(function(){
console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
我想补充两点:
withLatestFrom
方法在您的情况下更好,因为依赖图看起来像C -> B -> A
并且当 A 更新时 B 也会更新。如果A
和B
是独立的,那么combineLatest
将是更好的选择。Subject
B
不需要。你可以像这样重写代码(没有debounceTime
!)
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B_Observable = A_Observable.map(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
return newB;
});
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
好的,所以我用你的代码进行了测试。调整以改变一些事情。您可以在此处查看代码:https://jsbin.com/zonedirogi/edit?html,js,console,output。主要的变化实际上是用 A 的最新版本代替 B,而是从 B 的最新版本更改为 A。然后它按预期工作。
仅供参考,您不需要使用 asObservable()
将 subject 转换为 observable。
var a = new Rx.BehaviorSubject(1);
var b = new Rx.BehaviorSubject(10);
a.subscribe(x => {
b.next(x * 10);
});
a.withLatestFrom(b).subscribe(x => {
console.log(x)
})
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
a.next(2);
}, 1000);