如何共享创建的 Observable?

How to share an created Observable?

我创建了一个 Observable,它发出 ViewPager's positionOffset

public Observable<Float> observable() {

    return Observable.create(new Observable.OnSubscribe<Float>() {

        @Override
        public void call(final Subscriber<? super Float> subscriber) {

            if (viewPager == null) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(new IllegalStateException("viewPager is null"));
                }
                return;
            }

            final ViewPager.OnPageChangeListener listener = new ViewPager.OnPageChangeListener() {

                @Override
                public void onPageScrolled(final int position,
                                           final float positionOffset,
                                           final int positionOffsetPixels) {

                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext(positionOffset);
                    }
                }

                @Override
                public void onPageSelected(final int position) {

                }

                @Override
                public void onPageScrollStateChanged(final int state) {

                }
            };
            viewPager.addOnPageChangeListener(listener);

            subscriber.add(Subscriptions.create(() -> viewPager.removeOnPageChangeListener(listener)));
        }
    });
}

目前有效。

但是每次订阅它都会创建一个新的 ViewPager.OnPageChangeListener 并将其添加到 ViewPager。

有没有一种方法可以让所有订阅共享同一个 Observable 而 ViewPager 只有一个 Listener?

您可以使用 Observable.share(),如果您有多个订阅者并且不介意迟到的订阅者可能会丢失一些通知,这是一个不错的选择。

如果您希望所有订阅者看到相同的通知,请使用 Observable.publish() - 它将 return ConnectableObservable,一旦其 connect() 方法被调用就会开始发送项目。所以你可以做

connectable = originalObservable.publish();
connectable.subscribe(observer1);
connectable.subscribe(observer2);
connectable.connect();

您可以使用可连接的可观察对象。 例如下面的代码模拟了无限(永不完成)的事件流:

Observable<Float> o = Observable.concat(Observable.just(1f,2f,3f), Observable.never());

您可以使用 replay 和 refCount 创建一个可连接的可观察对象,它将缓存最后一个值:

Observable<Float> shared = o.replay(1).refCount();

第一个订阅者将创建可观察对象 "hot" 并强制其生成项目:

shared.subscribe(new Action1<Float>() {

  @Override
  public void call(Float t) {
    System.out.println("o1:" + t);

  }

});
Thread.sleep(1000);

第二个 Observable 可能会在一段时间后连接到它,并将从最新发出的项目开始:

shared.subscribe(new Action1<Float>() {

  @Override
  public void call(Float t) {
    System.out.println("o2:" + t);

  }

});

Thread.sleep(1000);

输出:

o1:1.0
o1:2.0
o1:3.0
o2:3.0