如何强制 publishReplay() 重新订阅?

How to force publishReplay() to resubscribe?

我是 RxJS 的新手,这是我的故事。我想要一个 "session" 可观察对象,这样新订阅者将始终获得当前会话,然后是所有新会话(如果它们可以出现)。所以我写了这样的东西:

var session = Rx.Observable.from([0,1,3])
  .do(x => console.log("Useful job"))
  .publishReplay(1)
  .refCount();

var subscr1 = session.subscribe( x => {
    console.log("sub1 = " +x)
    //subscr1.unsubscribe();
})

console.log("Completed");
subscr1.unsubscribe();

session.subscribe( x => {
  console.log("sub2 = " +x)
});

输出为:

Useful job
sub1 = 0
Useful job
sub1 = 1
Useful job
sub1 = 3
Completed
sub2 = 3

为什么sub2订阅了没有有用的工作?我希望它完全冷!

.publishReplay() 的第一个参数是它将重放的项目数,因此如果您知道总是只能期望 3 个,则可以使用 .publishReplay(3)

如果你想重播整个序列,你可以先用 toArray() 收集它的所有项目,将数组存储在 ReplaySubject 中,然后将数组展平为单个值。

var session = Rx.Observable.from([0,1,3])
  .toArray()
  .publishReplay(1)
  .refCount()
  .concatAll();

请注意,toArray() 在其源完成之前不会发出任何内容。

编辑:我现在知道问题出在哪里了。

您的方法是正确的,但问题出在 Observable.frompublishReplay() 中使用的主题。源 Observable 发出三个项目并发送 complete 通知。当 Subject 收到 completeerror 通知时,它会将自己标记为 stopped 并且不会再重新发送任何项目。这正是您的示例中发生的情况。

如果您在不发送 complete 的情况下手动发出值,它将按您的预期工作。

// var session = Rx.Observable.from([0,1,3])
var session = Rx.Observable.create(subscriber => {
    subscriber.next(0);
    subscriber.next(1);
    subscriber.next(3);
  })
  .do(x => console.log("Useful job"))
  .publishReplay(1)
  .refCount();

这会打印到控制台:

Useful job
sub1 = 0
Useful job
sub1 = 1
Useful job
sub1 = 3
Completed
sub2 = 3
Useful job
sub2 = 0
Useful job
sub2 = 1
Useful job
sub2 = 3

看到同样的问题:

终于找到了解决办法。

将 .publishReplay(1) 替换为 .multicast(() => new Rx.ReplaySubject(1))。所以基本上我用 subjectFactory 替换了 subject。

然后它按预期工作。所以当大家都退订时,它变得绝对冷淡。