如何在 运行 时间 "merge" observables?
How to "merge" observables at run time?
我的用例是视图组中的点击侦听器:
每人View
发表感冒Observable<Click>
API.
视图组管理视图集合,并提供一个 Observable<ViewClick>
API 从子视图发出点击。
我希望每个子视图在添加到复合可观察对象时都被订阅,但前提是复合对象有订阅。如果没有,则只有在订阅复合时才应该订阅子项。
到目前为止,我的解决方案是:
PublishSubject<ViewClick> composite;
Map<View, Subscription> subscriptions;
void addSource(View view, Observable<Click> clicks) {
Subscription s = clicks.map(click -> new ViewClick(view, click)
.subscribe(composite::onNext);
subscriptions.put(view, s);
}
void removeSource(View v) {
Subscription s = subscriptions.get(v);
s.unsubscribe;
subscriptions.remove(v);
}
Observable<ViewClick> compositeClicks() {
return composite;
}
但是,这意味着 clicks
可观察对象不再冷,因为无论是否订阅了 compositeClicks
,它们都会被订阅。
有没有更好的方法?
您需要一个特殊的 "warm" Subject,例如 BufferUntilSubscriber
,它保留源 Observables 直到单个消费者订阅它。不幸的是,class 不是官方 API 的一部分,但是,出于同样的目的,2.0 将有一个官方 UnicastSubject
:
UnicastSubject<Observable<Integer>> inputs = UnicastSubject.create();
Observable<String> p = inputs
.flatMap(v -> v.map(u -> u.toString()))
.publish()
.autoConnect()
;
inputs.onNext(Observable.just(1)
.doOnSubscribe(s -> System.out.println("Subscribed to 1")));
inputs.onNext(Observable.just(2)
.doOnSubscribe(s -> System.out.println("Subscribed to 2")));
inputs.onNext(Observable.just(3)
.doOnSubscribe(s -> System.out.println("Subscribed to 3")));
System.out.println("Subscribing to p");
p.subscribe(System.out::println);
你的困难来自于 addSource/removeSource 不是反应性的,如果你把它们变成 Rx 流就会消失。
几种方法:
IObservable<IObservable<View>>
其中内部可观察对象将在 activation/add 上有一个 onNext
,并且在移除后将 onComplete
。
IObservable<View>
x2(一个用于添加,一个用于删除)
IObservable<ViewActivationEvent>
(具有 add/remove 种类的元组 + 视图实例)
(假设你的点击流可以从视图中提取出来,否则你需要提供一个元组view+clickStream来代替视图,用于激活部分)
下面是使用上面的 #2 实现流的方式,但所有方法都可行:
Subject<View> add,remove;
clicks = add.flatMap(view -> getClickObservable(view).takeUntil(remove.where(v => v == view)))
我的用例是视图组中的点击侦听器:
每人
View
发表感冒Observable<Click>
API.视图组管理视图集合,并提供一个
Observable<ViewClick>
API 从子视图发出点击。
我希望每个子视图在添加到复合可观察对象时都被订阅,但前提是复合对象有订阅。如果没有,则只有在订阅复合时才应该订阅子项。
到目前为止,我的解决方案是:
PublishSubject<ViewClick> composite;
Map<View, Subscription> subscriptions;
void addSource(View view, Observable<Click> clicks) {
Subscription s = clicks.map(click -> new ViewClick(view, click)
.subscribe(composite::onNext);
subscriptions.put(view, s);
}
void removeSource(View v) {
Subscription s = subscriptions.get(v);
s.unsubscribe;
subscriptions.remove(v);
}
Observable<ViewClick> compositeClicks() {
return composite;
}
但是,这意味着 clicks
可观察对象不再冷,因为无论是否订阅了 compositeClicks
,它们都会被订阅。
有没有更好的方法?
您需要一个特殊的 "warm" Subject,例如 BufferUntilSubscriber
,它保留源 Observables 直到单个消费者订阅它。不幸的是,class 不是官方 API 的一部分,但是,出于同样的目的,2.0 将有一个官方 UnicastSubject
:
UnicastSubject<Observable<Integer>> inputs = UnicastSubject.create();
Observable<String> p = inputs
.flatMap(v -> v.map(u -> u.toString()))
.publish()
.autoConnect()
;
inputs.onNext(Observable.just(1)
.doOnSubscribe(s -> System.out.println("Subscribed to 1")));
inputs.onNext(Observable.just(2)
.doOnSubscribe(s -> System.out.println("Subscribed to 2")));
inputs.onNext(Observable.just(3)
.doOnSubscribe(s -> System.out.println("Subscribed to 3")));
System.out.println("Subscribing to p");
p.subscribe(System.out::println);
你的困难来自于 addSource/removeSource 不是反应性的,如果你把它们变成 Rx 流就会消失。
几种方法:
IObservable<IObservable<View>>
其中内部可观察对象将在 activation/add 上有一个onNext
,并且在移除后将onComplete
。IObservable<View>
x2(一个用于添加,一个用于删除)IObservable<ViewActivationEvent>
(具有 add/remove 种类的元组 + 视图实例)
(假设你的点击流可以从视图中提取出来,否则你需要提供一个元组view+clickStream来代替视图,用于激活部分)
下面是使用上面的 #2 实现流的方式,但所有方法都可行:
Subject<View> add,remove;
clicks = add.flatMap(view -> getClickObservable(view).takeUntil(remove.where(v => v == view)))