如何强制 publishReplay() 重新订阅?
How to force publishReplay() to resubscribe?
我是 RxJS 的新手,这是我的故事。我想要一个 "session" 可观察对象,这样新订阅者将始终获得当前会话,然后是所有新会话(如果它们可以出现)。所以我写了这样的东西:
var session = Rx.Observable.from([0,1,3])
.do(x => console.log("Useful job"))
.publishReplay(1)
.refCount();
var subscr1 = session.subscribe( x => {
console.log("sub1 = " +x)
//subscr1.unsubscribe();
})
console.log("Completed");
subscr1.unsubscribe();
session.subscribe( x => {
console.log("sub2 = " +x)
});
输出为:
Useful job
sub1 = 0
Useful job
sub1 = 1
Useful job
sub1 = 3
Completed
sub2 = 3
为什么sub2订阅了没有有用的工作?我希望它完全冷!
.publishReplay()
的第一个参数是它将重放的项目数,因此如果您知道总是只能期望 3 个,则可以使用 .publishReplay(3)
。
如果你想重播整个序列,你可以先用 toArray()
收集它的所有项目,将数组存储在 ReplaySubject
中,然后将数组展平为单个值。
var session = Rx.Observable.from([0,1,3])
.toArray()
.publishReplay(1)
.refCount()
.concatAll();
请注意,toArray()
在其源完成之前不会发出任何内容。
编辑:我现在知道问题出在哪里了。
您的方法是正确的,但问题出在 Observable.from
和 publishReplay()
中使用的主题。源 Observable 发出三个项目并发送 complete
通知。当 Subject 收到 complete
或 error
通知时,它会将自己标记为 stopped
并且不会再重新发送任何项目。这正是您的示例中发生的情况。
如果您在不发送 complete
的情况下手动发出值,它将按您的预期工作。
// var session = Rx.Observable.from([0,1,3])
var session = Rx.Observable.create(subscriber => {
subscriber.next(0);
subscriber.next(1);
subscriber.next(3);
})
.do(x => console.log("Useful job"))
.publishReplay(1)
.refCount();
这会打印到控制台:
Useful job
sub1 = 0
Useful job
sub1 = 1
Useful job
sub1 = 3
Completed
sub2 = 3
Useful job
sub2 = 0
Useful job
sub2 = 1
Useful job
sub2 = 3
看到同样的问题:
终于找到了解决办法。
将 .publishReplay(1) 替换为 .multicast(() => new Rx.ReplaySubject(1))。所以基本上我用 subjectFactory 替换了 subject。
然后它按预期工作。所以当大家都退订时,它变得绝对冷淡。
我是 RxJS 的新手,这是我的故事。我想要一个 "session" 可观察对象,这样新订阅者将始终获得当前会话,然后是所有新会话(如果它们可以出现)。所以我写了这样的东西:
var session = Rx.Observable.from([0,1,3])
.do(x => console.log("Useful job"))
.publishReplay(1)
.refCount();
var subscr1 = session.subscribe( x => {
console.log("sub1 = " +x)
//subscr1.unsubscribe();
})
console.log("Completed");
subscr1.unsubscribe();
session.subscribe( x => {
console.log("sub2 = " +x)
});
输出为:
Useful job
sub1 = 0
Useful job
sub1 = 1
Useful job
sub1 = 3
Completed
sub2 = 3
为什么sub2订阅了没有有用的工作?我希望它完全冷!
.publishReplay()
的第一个参数是它将重放的项目数,因此如果您知道总是只能期望 3 个,则可以使用 .publishReplay(3)
。
如果你想重播整个序列,你可以先用 toArray()
收集它的所有项目,将数组存储在 ReplaySubject
中,然后将数组展平为单个值。
var session = Rx.Observable.from([0,1,3])
.toArray()
.publishReplay(1)
.refCount()
.concatAll();
请注意,toArray()
在其源完成之前不会发出任何内容。
编辑:我现在知道问题出在哪里了。
您的方法是正确的,但问题出在 Observable.from
和 publishReplay()
中使用的主题。源 Observable 发出三个项目并发送 complete
通知。当 Subject 收到 complete
或 error
通知时,它会将自己标记为 stopped
并且不会再重新发送任何项目。这正是您的示例中发生的情况。
如果您在不发送 complete
的情况下手动发出值,它将按您的预期工作。
// var session = Rx.Observable.from([0,1,3])
var session = Rx.Observable.create(subscriber => {
subscriber.next(0);
subscriber.next(1);
subscriber.next(3);
})
.do(x => console.log("Useful job"))
.publishReplay(1)
.refCount();
这会打印到控制台:
Useful job
sub1 = 0
Useful job
sub1 = 1
Useful job
sub1 = 3
Completed
sub2 = 3
Useful job
sub2 = 0
Useful job
sub2 = 1
Useful job
sub2 = 3
看到同样的问题:
终于找到了解决办法。
将 .publishReplay(1) 替换为 .multicast(() => new Rx.ReplaySubject(1))。所以基本上我用 subjectFactory 替换了 subject。
然后它按预期工作。所以当大家都退订时,它变得绝对冷淡。