ConnectableObservable 与 flatMap() 自引用?
ConnectableObservable vs flatMap() self-references?
我对 ConnectableObservable
的用例很好奇,并认为将昂贵的排放从冷可观察量(如从数据库查询)转变为热排放可能会有所帮助。这样可以避免昂贵的重播,并且可以将一组发射推送给所有运营商和订阅者。
然而,经过一些思想实验后,我担心 flatMaps 中的自引用可能会导致问题。
例如,假设我通过 ConnectableObservable
发出值 1 到 10。但是我flatMap()
每个值都是所有值的总和,然后减去当前值。
ConnectableObservable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.publish();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
source.connect();
我希望我能得到这个输出。
1
2
3
4
5
6
7
8
9
10
SUM - i: 54
SUM - i: 53
SUM - i: 52
SUM - i: 51
SUM - i: 50
SUM - i: 49
SUM - i: 48
SUM - i: 47
SUM - i: 46
SUM - i: 45
但是我得到了这个。
1
2
3
4
5
6
7
8
9
10
SUM - i: 53
SUM - i: 50
SUM - i: 46
SUM - i: 41
SUM - i: 35
SUM - i: 28
SUM - i: 20
SUM - i: 11
SUM - i: 1
SUM - i: -10
正如我担心的那样,flatMap()
看起来需要重播这些值,因为它无法处理源的热顺序性质。因此,如果我使用 cache()
运算符,则一切正常,因为缓存值将为每个 flatMap()
运算符重播。
Observable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.cache();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
这些是我的问题:
这个 ConnectableObservable
过程到底发生了什么?它看起来是确定性的,那么它是如何得出这些值的呢?
可以肯定地说 ConnectableObervable
对任何使用它的运算符的自引用都是危险的吗?在这种情况下,cache()
应该是热门运营商吗?
What exactly happened with this ConnectableObservable process? It looks to be deterministic so how did it come up with those values?
这个设置是不直观的,但实际情况是,在创建各自的起始值之前,内部和不存在,并且它们中的每一个在创建后只能看到原始序列的一个元素。例如,对于 1,内部求和将仅获取从 2 到 10 的事件。
Is it safe to say that ConnectableObervable can be dangerous to self-reference in any operators that use it? And cache() should be the go-to hot operator in these circumstances?
问题不在于 ConnectableObservable
,而在于 publish
,它对时间敏感且 Subscriber
敏感:谁在场接收事件,谁不在场就不会接收追溯任何事情。
我对 ConnectableObservable
的用例很好奇,并认为将昂贵的排放从冷可观察量(如从数据库查询)转变为热排放可能会有所帮助。这样可以避免昂贵的重播,并且可以将一组发射推送给所有运营商和订阅者。
然而,经过一些思想实验后,我担心 flatMaps 中的自引用可能会导致问题。
例如,假设我通过 ConnectableObservable
发出值 1 到 10。但是我flatMap()
每个值都是所有值的总和,然后减去当前值。
ConnectableObservable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.publish();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
source.connect();
我希望我能得到这个输出。
1
2
3
4
5
6
7
8
9
10
SUM - i: 54
SUM - i: 53
SUM - i: 52
SUM - i: 51
SUM - i: 50
SUM - i: 49
SUM - i: 48
SUM - i: 47
SUM - i: 46
SUM - i: 45
但是我得到了这个。
1
2
3
4
5
6
7
8
9
10
SUM - i: 53
SUM - i: 50
SUM - i: 46
SUM - i: 41
SUM - i: 35
SUM - i: 28
SUM - i: 20
SUM - i: 11
SUM - i: 1
SUM - i: -10
正如我担心的那样,flatMap()
看起来需要重播这些值,因为它无法处理源的热顺序性质。因此,如果我使用 cache()
运算符,则一切正常,因为缓存值将为每个 flatMap()
运算符重播。
Observable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.cache();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
这些是我的问题:
这个
ConnectableObservable
过程到底发生了什么?它看起来是确定性的,那么它是如何得出这些值的呢?可以肯定地说
ConnectableObervable
对任何使用它的运算符的自引用都是危险的吗?在这种情况下,cache()
应该是热门运营商吗?
What exactly happened with this ConnectableObservable process? It looks to be deterministic so how did it come up with those values?
这个设置是不直观的,但实际情况是,在创建各自的起始值之前,内部和不存在,并且它们中的每一个在创建后只能看到原始序列的一个元素。例如,对于 1,内部求和将仅获取从 2 到 10 的事件。
Is it safe to say that ConnectableObervable can be dangerous to self-reference in any operators that use it? And cache() should be the go-to hot operator in these circumstances?
问题不在于 ConnectableObservable
,而在于 publish
,它对时间敏感且 Subscriber
敏感:谁在场接收事件,谁不在场就不会接收追溯任何事情。