用另一个可观察对象改变一个可观察对象的订阅状态
Change the subscription state of an observable with another observable
如果您 运行 下面的代码,您将在控制台中看到无论我是否 subscribed
到 subject
,都会发送一个 xhr 请求。我不想在没有被替代时提出这些要求。
// npm install rxjs
const Rx = require('rxjs/Rx');
let subject = new Rx.BehaviorSubject(null)
Rx.Observable.timer(0, 1000).subscribe(i => someApiCall(i));
//at this point xhr request will be sent every second
function someApiCall(i){
// retrieve some data
console.log("xhr request sent")
subject.next(i);
}
//so here we are gonna subscribe to the subject, xhr made prior
//to this subscription are useless
let subscription;
setTimeout(() => subscription = subject.subscribe(i => console.log(i)),2500);
setTimeout(() => subscription.unsubscribe(),6000);
// now we are unsubscribing but the xhr req keep going
我使用 behaviorSubject 而不是直接订阅 observable 的原因是因为我想在重新订阅时立即获得最后一个 xhr 请求的最后一个值。
您应该使用 .ShareReplay(1)
而不是 BehaviourSubject
。这样你就可以保持惰性并缓存你的 xhr 调用的最后一个值。
const source = Rx.Observable.interval(1000)
.mergeMap(I => doXhr())
.shareReplay(1);
source.subscribe(console.log)
https://acutmore.jsbin.com/bepiho/2/edit?js,console
const { Observable } = Rx;
function someApiCall(i){
return Observable.create(observer => {
console.log("xhr request sent")
observer.next(i);
observer.complete();
});
}
const data = Rx.Observable.timer(0, 1000)
// map each value to the values of someApiCall
. mergeMap(i => someApiCall(i))
// share the values through a replaySubject
.publishReplay(1)
// Only connect to the source when there is at least one subscriber
.refCount();
data
.take(5)
.subscribe(v => console.log(v));
data
.take(1)
.subscribe(v => console.log(v));
值得注意的是,如果每个人在订阅时都执行 .take(1)
,这将无法按预期工作,因为每个人都将获得 ReplaySubject
中的值,然后在发出新的 xhrRequest 之前立即取消订阅。
即需要一些东西让它 alive 足够长的时间才能持续触发。
如果您 运行 下面的代码,您将在控制台中看到无论我是否 subscribed
到 subject
,都会发送一个 xhr 请求。我不想在没有被替代时提出这些要求。
// npm install rxjs
const Rx = require('rxjs/Rx');
let subject = new Rx.BehaviorSubject(null)
Rx.Observable.timer(0, 1000).subscribe(i => someApiCall(i));
//at this point xhr request will be sent every second
function someApiCall(i){
// retrieve some data
console.log("xhr request sent")
subject.next(i);
}
//so here we are gonna subscribe to the subject, xhr made prior
//to this subscription are useless
let subscription;
setTimeout(() => subscription = subject.subscribe(i => console.log(i)),2500);
setTimeout(() => subscription.unsubscribe(),6000);
// now we are unsubscribing but the xhr req keep going
我使用 behaviorSubject 而不是直接订阅 observable 的原因是因为我想在重新订阅时立即获得最后一个 xhr 请求的最后一个值。
您应该使用 .ShareReplay(1)
而不是 BehaviourSubject
。这样你就可以保持惰性并缓存你的 xhr 调用的最后一个值。
const source = Rx.Observable.interval(1000)
.mergeMap(I => doXhr())
.shareReplay(1);
source.subscribe(console.log)
https://acutmore.jsbin.com/bepiho/2/edit?js,console
const { Observable } = Rx;
function someApiCall(i){
return Observable.create(observer => {
console.log("xhr request sent")
observer.next(i);
observer.complete();
});
}
const data = Rx.Observable.timer(0, 1000)
// map each value to the values of someApiCall
. mergeMap(i => someApiCall(i))
// share the values through a replaySubject
.publishReplay(1)
// Only connect to the source when there is at least one subscriber
.refCount();
data
.take(5)
.subscribe(v => console.log(v));
data
.take(1)
.subscribe(v => console.log(v));
值得注意的是,如果每个人在订阅时都执行 .take(1)
,这将无法按预期工作,因为每个人都将获得 ReplaySubject
中的值,然后在发出新的 xhrRequest 之前立即取消订阅。
即需要一些东西让它 alive 足够长的时间才能持续触发。