Subjects 订阅 Observables

Subjects subscribing to observables

我有以下简单代码:

let a: Observable<any> = Observable.from([1,2,3,4,5]).share();
let b: Subject<any> = new Subject();

a.subscribe(b)
a.subscribe(value => console.log("New observable value: " + value));
b.subscribe(value => console.log("New subject value: " + value));

b.next("value1");
b.next("value2");

我看到的唯一控制台行是数组中每个值带有 "New observable value: " 的行。如果我注释掉第 3 行,那么我会得到所有 "New observable value" 行和 "New subject value" 行,但是受试者当然不会得到可观察对象(数组的一部分)省略的任何值。

如此基本的问题 - 这是为什么?为什么我的主题看不到数组元素 1、2、3、4、5 中的任何一个?它订阅了 observable,所以它应该得到这些事件(share() 应该广播给所有订阅者),但这似乎是我的误解。

我在这里寻找的是对主题进行单一订阅,并查看来自 observable 的所有值以及通过调用 next() 发出的任何值。是我的代码有误,还是我试图以错误的方式做事?

这是主题及其内部状态的经典问题。

当您调用 a.subscribe(b) 时,Subject 会收到来自源 Observable a 的所有项目以及将其标记为已停止的 complete 通知。您可以在源代码中看到它:https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L86
然后所有其他信号都将被忽略,您可以在此处亲眼看到:https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L56

这意味着这个 Subject 实例将永远不会再发出任何东西。它永远完成了。您可以看到 complete 信号实际上是使用 do() 运算符发送的:

a.do(val => console.log('do: ' + val), null, () => console.log('do: complete'))
  .subscribe(b)

查看演示:https://jsbin.com/bepupor/1/edit?js,console

因此,在您的情况下,您希望保护主题 b 免于收到 complete 通知。您可以通过手动获取并仅推送 next 信号来做到这一点:

a.subscribe(val => b.next(val));

查看演示:https://jsbin.com/bepupor/4/edit?js,console

另一个与您的问题非常相似的问题:

您的基本问题是您的订阅顺序,当您订阅您的 console.log 时,Subject 已经收到数据。 (查看下面的代码示例 运行)

第二个问题,为什么 next("value1") 不起作用,是因为 a.subscribe(b) 也会在 [=20= 发出后调用 b 上的 .complete() ].

let a = Rx.Observable.from([1,2,3,4,5]).share();
let b = new Rx.Subject();

b.subscribe(value => console.log("New subject value: " + value));
a.subscribe(value => console.log("New observable value: " + value));
a.subscribe(b);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>


一个 解决此问题的方法:不要订阅完整的 Subject,而只订阅 next-call。不过请记住,这只是 一种 方法,根据您的具体情况,可能会有更好的解决方案。

let a = Rx.Observable.from([1,2,3,4,5]).share();
let b = new Rx.Subject();

b.subscribe(value => console.log("New subject value: " + value));
a.subscribe(value => console.log("New observable value: " + value));
a.subscribe(data => b.next(data));

b.next("value1");
b.next("value2");
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>