Subjects 订阅 Observables
Subjects subscribing to observables
我有以下简单代码:
let a: Observable<any> = Observable.from([1,2,3,4,5]).share();
let b: Subject<any> = new Subject();
a.subscribe(b)
a.subscribe(value => console.log("New observable value: " + value));
b.subscribe(value => console.log("New subject value: " + value));
b.next("value1");
b.next("value2");
我看到的唯一控制台行是数组中每个值带有 "New observable value: " 的行。如果我注释掉第 3 行,那么我会得到所有 "New observable value" 行和 "New subject value" 行,但是受试者当然不会得到可观察对象(数组的一部分)省略的任何值。
如此基本的问题 - 这是为什么?为什么我的主题看不到数组元素 1、2、3、4、5 中的任何一个?它订阅了 observable,所以它应该得到这些事件(share()
应该广播给所有订阅者),但这似乎是我的误解。
我在这里寻找的是对主题进行单一订阅,并查看来自 observable 的所有值以及通过调用 next()
发出的任何值。是我的代码有误,还是我试图以错误的方式做事?
这是主题及其内部状态的经典问题。
当您调用 a.subscribe(b)
时,Subject 会收到来自源 Observable a
的所有项目以及将其标记为已停止的 complete
通知。您可以在源代码中看到它:https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L86
然后所有其他信号都将被忽略,您可以在此处亲眼看到:https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L56
这意味着这个 Subject 实例将永远不会再发出任何东西。它永远完成了。您可以看到 complete
信号实际上是使用 do()
运算符发送的:
a.do(val => console.log('do: ' + val), null, () => console.log('do: complete'))
.subscribe(b)
查看演示:https://jsbin.com/bepupor/1/edit?js,console
因此,在您的情况下,您希望保护主题 b
免于收到 complete
通知。您可以通过手动获取并仅推送 next
信号来做到这一点:
a.subscribe(val => b.next(val));
查看演示:https://jsbin.com/bepupor/4/edit?js,console
另一个与您的问题非常相似的问题:
您的基本问题是您的订阅顺序,当您订阅您的 console.log
时,Subject
已经收到数据。 (查看下面的代码示例 运行)
第二个问题,为什么 next("value1")
不起作用,是因为 a.subscribe(b)
也会在 [=20= 发出后调用 b
上的 .complete()
].
let a = Rx.Observable.from([1,2,3,4,5]).share();
let b = new Rx.Subject();
b.subscribe(value => console.log("New subject value: " + value));
a.subscribe(value => console.log("New observable value: " + value));
a.subscribe(b);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
一个 解决此问题的方法:不要订阅完整的 Subject
,而只订阅 next
-call。不过请记住,这只是 一种 方法,根据您的具体情况,可能会有更好的解决方案。
let a = Rx.Observable.from([1,2,3,4,5]).share();
let b = new Rx.Subject();
b.subscribe(value => console.log("New subject value: " + value));
a.subscribe(value => console.log("New observable value: " + value));
a.subscribe(data => b.next(data));
b.next("value1");
b.next("value2");
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
我有以下简单代码:
let a: Observable<any> = Observable.from([1,2,3,4,5]).share();
let b: Subject<any> = new Subject();
a.subscribe(b)
a.subscribe(value => console.log("New observable value: " + value));
b.subscribe(value => console.log("New subject value: " + value));
b.next("value1");
b.next("value2");
我看到的唯一控制台行是数组中每个值带有 "New observable value: " 的行。如果我注释掉第 3 行,那么我会得到所有 "New observable value" 行和 "New subject value" 行,但是受试者当然不会得到可观察对象(数组的一部分)省略的任何值。
如此基本的问题 - 这是为什么?为什么我的主题看不到数组元素 1、2、3、4、5 中的任何一个?它订阅了 observable,所以它应该得到这些事件(share()
应该广播给所有订阅者),但这似乎是我的误解。
我在这里寻找的是对主题进行单一订阅,并查看来自 observable 的所有值以及通过调用 next()
发出的任何值。是我的代码有误,还是我试图以错误的方式做事?
这是主题及其内部状态的经典问题。
当您调用 a.subscribe(b)
时,Subject 会收到来自源 Observable a
的所有项目以及将其标记为已停止的 complete
通知。您可以在源代码中看到它:https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L86
然后所有其他信号都将被忽略,您可以在此处亲眼看到:https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L56
这意味着这个 Subject 实例将永远不会再发出任何东西。它永远完成了。您可以看到 complete
信号实际上是使用 do()
运算符发送的:
a.do(val => console.log('do: ' + val), null, () => console.log('do: complete'))
.subscribe(b)
查看演示:https://jsbin.com/bepupor/1/edit?js,console
因此,在您的情况下,您希望保护主题 b
免于收到 complete
通知。您可以通过手动获取并仅推送 next
信号来做到这一点:
a.subscribe(val => b.next(val));
查看演示:https://jsbin.com/bepupor/4/edit?js,console
另一个与您的问题非常相似的问题:
您的基本问题是您的订阅顺序,当您订阅您的 console.log
时,Subject
已经收到数据。 (查看下面的代码示例 运行)
第二个问题,为什么 next("value1")
不起作用,是因为 a.subscribe(b)
也会在 [=20= 发出后调用 b
上的 .complete()
].
let a = Rx.Observable.from([1,2,3,4,5]).share();
let b = new Rx.Subject();
b.subscribe(value => console.log("New subject value: " + value));
a.subscribe(value => console.log("New observable value: " + value));
a.subscribe(b);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
一个 解决此问题的方法:不要订阅完整的 Subject
,而只订阅 next
-call。不过请记住,这只是 一种 方法,根据您的具体情况,可能会有更好的解决方案。
let a = Rx.Observable.from([1,2,3,4,5]).share();
let b = new Rx.Subject();
b.subscribe(value => console.log("New subject value: " + value));
a.subscribe(value => console.log("New observable value: " + value));
a.subscribe(data => b.next(data));
b.next("value1");
b.next("value2");
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>