如何共享创建的 Observable?
How to share an created Observable?
我创建了一个 Observable,它发出 ViewPager's positionOffset。
public Observable<Float> observable() {
return Observable.create(new Observable.OnSubscribe<Float>() {
@Override
public void call(final Subscriber<? super Float> subscriber) {
if (viewPager == null) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(new IllegalStateException("viewPager is null"));
}
return;
}
final ViewPager.OnPageChangeListener listener = new ViewPager.OnPageChangeListener() {
@Override
public void onPageScrolled(final int position,
final float positionOffset,
final int positionOffsetPixels) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(positionOffset);
}
}
@Override
public void onPageSelected(final int position) {
}
@Override
public void onPageScrollStateChanged(final int state) {
}
};
viewPager.addOnPageChangeListener(listener);
subscriber.add(Subscriptions.create(() -> viewPager.removeOnPageChangeListener(listener)));
}
});
}
目前有效。
但是每次订阅它都会创建一个新的 ViewPager.OnPageChangeListener
并将其添加到 ViewPager。
有没有一种方法可以让所有订阅共享同一个 Observable 而 ViewPager 只有一个 Listener?
您可以使用 Observable.share()
,如果您有多个订阅者并且不介意迟到的订阅者可能会丢失一些通知,这是一个不错的选择。
如果您希望所有订阅者看到相同的通知,请使用 Observable.publish()
- 它将 return ConnectableObservable
,一旦其 connect()
方法被调用就会开始发送项目。所以你可以做
connectable = originalObservable.publish();
connectable.subscribe(observer1);
connectable.subscribe(observer2);
connectable.connect();
您可以使用可连接的可观察对象。
例如下面的代码模拟了无限(永不完成)的事件流:
Observable<Float> o = Observable.concat(Observable.just(1f,2f,3f), Observable.never());
您可以使用 replay 和 refCount 创建一个可连接的可观察对象,它将缓存最后一个值:
Observable<Float> shared = o.replay(1).refCount();
第一个订阅者将创建可观察对象 "hot" 并强制其生成项目:
shared.subscribe(new Action1<Float>() {
@Override
public void call(Float t) {
System.out.println("o1:" + t);
}
});
Thread.sleep(1000);
第二个 Observable 可能会在一段时间后连接到它,并将从最新发出的项目开始:
shared.subscribe(new Action1<Float>() {
@Override
public void call(Float t) {
System.out.println("o2:" + t);
}
});
Thread.sleep(1000);
输出:
o1:1.0
o1:2.0
o1:3.0
o2:3.0
我创建了一个 Observable,它发出 ViewPager's positionOffset。
public Observable<Float> observable() {
return Observable.create(new Observable.OnSubscribe<Float>() {
@Override
public void call(final Subscriber<? super Float> subscriber) {
if (viewPager == null) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(new IllegalStateException("viewPager is null"));
}
return;
}
final ViewPager.OnPageChangeListener listener = new ViewPager.OnPageChangeListener() {
@Override
public void onPageScrolled(final int position,
final float positionOffset,
final int positionOffsetPixels) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(positionOffset);
}
}
@Override
public void onPageSelected(final int position) {
}
@Override
public void onPageScrollStateChanged(final int state) {
}
};
viewPager.addOnPageChangeListener(listener);
subscriber.add(Subscriptions.create(() -> viewPager.removeOnPageChangeListener(listener)));
}
});
}
目前有效。
但是每次订阅它都会创建一个新的 ViewPager.OnPageChangeListener
并将其添加到 ViewPager。
有没有一种方法可以让所有订阅共享同一个 Observable 而 ViewPager 只有一个 Listener?
您可以使用 Observable.share()
,如果您有多个订阅者并且不介意迟到的订阅者可能会丢失一些通知,这是一个不错的选择。
如果您希望所有订阅者看到相同的通知,请使用 Observable.publish()
- 它将 return ConnectableObservable
,一旦其 connect()
方法被调用就会开始发送项目。所以你可以做
connectable = originalObservable.publish();
connectable.subscribe(observer1);
connectable.subscribe(observer2);
connectable.connect();
您可以使用可连接的可观察对象。 例如下面的代码模拟了无限(永不完成)的事件流:
Observable<Float> o = Observable.concat(Observable.just(1f,2f,3f), Observable.never());
您可以使用 replay 和 refCount 创建一个可连接的可观察对象,它将缓存最后一个值:
Observable<Float> shared = o.replay(1).refCount();
第一个订阅者将创建可观察对象 "hot" 并强制其生成项目:
shared.subscribe(new Action1<Float>() {
@Override
public void call(Float t) {
System.out.println("o1:" + t);
}
});
Thread.sleep(1000);
第二个 Observable 可能会在一段时间后连接到它,并将从最新发出的项目开始:
shared.subscribe(new Action1<Float>() {
@Override
public void call(Float t) {
System.out.println("o2:" + t);
}
});
Thread.sleep(1000);
输出:
o1:1.0
o1:2.0
o1:3.0
o2:3.0