用另一个可观察对象改变一个可观察对象的订阅状态

Change the subscription state of an observable with another observable

如果您 运行 下面的代码,您将在控制台中看到无论我是否 subscribedsubject,都会发送一个 xhr 请求。我不想在没有被替代时提出这些要求。

// npm install rxjs
const Rx = require('rxjs/Rx');

let subject = new Rx.BehaviorSubject(null)
Rx.Observable.timer(0, 1000).subscribe(i => someApiCall(i));
//at this point xhr request will be sent every second

function someApiCall(i){
    // retrieve some data
    console.log("xhr request sent")
    subject.next(i);
}

//so here we are gonna subscribe to the subject, xhr made prior 
//to this subscription are useless
let subscription;
setTimeout(() => subscription = subject.subscribe(i => console.log(i)),2500);
setTimeout(() => subscription.unsubscribe(),6000);
// now we are unsubscribing but the xhr req keep going

我使用 behaviorSubject 而不是直接订阅 observable 的原因是因为我想在重新订阅时立即获得最后一个 xhr 请求的最后一个值。

您应该使用 .ShareReplay(1) 而不是 BehaviourSubject。这样你就可以保持惰性并缓存你的 xhr 调用的最后一个值。

const source = Rx.Observable.interval(1000)
  .mergeMap(I => doXhr())
  .shareReplay(1);

source.subscribe(console.log)

https://acutmore.jsbin.com/bepiho/2/edit?js,console

const { Observable } = Rx;

function someApiCall(i){
  return Observable.create(observer => {
    console.log("xhr request sent")
    observer.next(i);
    observer.complete();
  });
}

const data = Rx.Observable.timer(0, 1000)
  // map each value to the values of someApiCall
  . mergeMap(i => someApiCall(i))
  // share the values through a replaySubject
  .publishReplay(1)
  // Only connect to the source when there is at least one subscriber
  .refCount();

data
  .take(5)
  .subscribe(v => console.log(v));

data
  .take(1)
  .subscribe(v => console.log(v));

值得注意的是,如果每个人在订阅时都执行 .take(1),这将无法按预期工作,因为每个人都将获得 ReplaySubject 中的值,然后在发出新的 xhrRequest 之前立即取消订阅。

即需要一些东西让它 alive 足够长的时间才能持续触发。