Rxjs - 订阅相互依赖的可观察对象

Rxjs - Subscribing to interdependent observables

我正在学习 angular 2 和 rxjs。我有 3 个变量,A B C.

我正在尝试设置可观察对象,以便:当 A 更新时,B 和 C 将自动更新。当 B 更新时,C 将自动更新。我尝试了两种设置,但都不尽如人意。

我如何设置我的可观察对象/订阅,以便在更新 A 时,C 只会使用 A 和 B 的最新值更新一次?

编辑 - 添加代码

var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();

A_Observable.subscribe(function(A_value) {
  var newB = A_value * 10;
  // console.log("B auto updating to " + newB);
  B.next(newB);
});

// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
  console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});

// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable);
combineAllObservable.subscribe(function(data) {
  console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  A.next(2);
},1000);

// SATISFACTORY RESULT
setTimeout(function(){
  console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

代码之所以如此,是因为默认情况下 RxJs 运行s 代码是同步的。 withLatestFrom 案例的事件链:

  1. 您推送新的 A 值:A.next(2);
  2. RxjS看到A有两种用法:
    1. B-订阅(先到先得)
    2. withLatestFrom(第二)
  3. RxJs 调用 B-订阅
    1. 它推送 B
    2. 的新值
    3. RxjS看到有B的用法:C-subscription
    4. RxJs 调用 C-订阅(旧值为 A!)
  4. A 的新值被拉入 withLatestFromC- 订阅未被触发(因为它仅由 B 更新触发)

可能的解决方案是添加debounceTime(0)。当withLatestFrom中的值已经更新时,它会强制C-订阅运行异步。

有关 RxJs Scheduler 的更多信息 google。

var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();

A_Observable.subscribe(function(A_value) {
  var newB = A_value * 10;
  // console.log("B auto updating to " + newB);
  B.next(newB);
});

// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.debounceTime(0).withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
  console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});

// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable).debounceTime(0);
combineAllObservable.subscribe(function(data) {
  console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  A.next(2);
},1000);

// SATISFACTORY RESULT
setTimeout(function(){
  console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

我想补充两点:

  1. withLatestFrom 方法在您的情况下更好,因为依赖图看起来像 C -> B -> A 并且当 A 更新时 B 也会更新。如果 AB 是独立的,那么 combineLatest 将是更好的选择。
  2. Subject B 不需要。你可以像这样重写代码(没有debounceTime!)

var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();

var B_Observable = A_Observable.map(function(A_value) {
  var newB = A_value * 10;
  // console.log("B auto updating to " + newB);
  return newB;
});

var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
  console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  A.next(2);
},1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

好的,所以我用你的代码进行了测试。调整以改变一些事情。您可以在此处查看代码:https://jsbin.com/zonedirogi/edit?html,js,console,output。主要的变化实际上是用 A 的最新版本代替 B,而是从 B 的最新版本更改为 A。然后它按预期工作。

仅供参考,您不需要使用 asObservable() 将 subject 转换为 observable。

var a = new Rx.BehaviorSubject(1);
var b = new Rx.BehaviorSubject(10);

a.subscribe(x => {
  b.next(x * 10);
});

a.withLatestFrom(b).subscribe(x => {
  console.log(x)
})

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  a.next(2);
}, 1000);