如何使用 ConnectableObservable 预取然后使用不同订阅者处理的数据

How to use ConnectableObservable to prefetch and then use processed data with different subscriber

在我的应用程序中,我需要在应用程序启动时立即加载大量数据。此外,我需要在某些 fragment/activity.

中加载数据时接收事件

为此我正在使用 RxJava ConnectableObservable。我使用 replay() 因为我需要多个订阅者的相同数据。

伪代码:

 Observable.create(emitter -> {
            try {
                Data next = getDataFromDb();
                emitter.onNext(next);
                emitter.onCompleted();
            } catch (SQLiteException e) {
                emitter.onError(e);
            }
        }, Emitter.BackpressureMode.BUFFER)
        .toList()
        .compose(applySchedulers())
        .replay()

现在如果我想预取数据,我应该在应用程序 class 中 subscribe 然后在 Activity/Fragment 中使用 connect() 吗?

试试这个:

observable = Observable.create(emitter -> {
        try {
            Data next = getDataFromDb();
            emitter.onNext(next);
        } catch (SQLiteException e) {
            emitter.onError(e);
        }
    }, Emitter.BackpressureMode.BUFFER)
    .toList()
    .compose(applySchedulers())
    .replay(1)
    .autoConnect()
//start your prefetch
observable.subscribe()//you should better add some log to see the process

//In your Activity
observable.subscribe(/**Your Subscribe**/)// here you will get the replayed value

注意:

  1. 你应该保持你的 Observable 的同一个实例,否则你不能得到你重放的值

  2. 你应该使用 autoConnect() 的其他重载,例如 autoConnect(int numberOfSubscribers, @NonNull Consumer<? super Disposable> connection) 并为你的上游源获取一次性的(订阅 RxJava 1.x)。