Realm + Retrofit + RxJava:Concat 和 SubscribeOn

Realm + Retrofit + RxJava: Concat and SubscribeOn

我在使用 RxJava concat 运算符时遇到问题。我有两个可观察对象,第一个从服务器数据库发出结果,另一个从本地数据库发出结果,然后我连接 :

// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);

// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);

Observable.concat(localObservable, remoteObservable)
    .doOnNext(result -> /* Do my stuff */)
    .observeOn(AndroidSchedulers.mainThread())
    .doOnError(throwable -> throwable.printStackTrace())
    .subscribe()

所以这导致了我的问题,因为我没有使用 subscribeOn() 连接的可观察对象是 运行ning 在 AndroidScheduler.MainThread() 上,这不是 运行 远程和它启动 NetworkOnMainThreadException.

如果我实现 subscribeOn(Schedulers.computation()) 我会得到 Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created 因为当然 Observable 不在线程上 运行 领域实例确实存在。

我搜索了其他问题,但没有得到任何有用的信息,我检查了 realm: https://github.com/realm/realm-java/blob/master/examples/rxJavaExample/src/main/java/io/realm/examples/rxjava/retrofit/RetrofitExample.java 制作的示例,但奇怪的是,我看到改装 observable 没有订阅任何内容并且它有效。

为什么它适用于示例,而在我的代码中我不能这样做?有什么建议吗?

我相信你应该在正确的地方使用 subscribeOn()

// Uses a Realm in the UI thread
Observable<MyResult> realmObservable = mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread());

// Uses Retrofit
Observable<MyResult> retrofitObservable = mRetrofitDataSource.find(tId).subscribeOn(Subscribers.io());

Observable.concat(realmObservable, retrofitObservable)
    .doOnNext(result -> /* Do my stuff */)
    .subscribeOn(AndroidSchedulers.mainThread())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnError(throwable -> throwable.printStackTrace())
    .subscribe()

看看它是否解决了您的问题。

您可以像下面这样连接本地和远程可观察对象:

// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);

// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);

Observable.concat(localObservable, remoteObservable).first()
                .map(new Func1<MyResult, MyResult>() {
                    @Override
                    public myResult call(MyResult result) {
                        if (result == null) {
                            throw new IllegalArgumentException();
                        }
                        return result;
                    }
                });

并像下面这样订阅:

CompositeSubscription mCompositeSubscription = new CompositeSubscription();
final Subscription subscription = mRepo.find(tId
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<MyResult>() {
                    @Override
                    public void onCompleted() {
                        // Completed
                    }

                    @Override
                    public void onError(Throwable e) {
                        // onError
                    }

                    @Override
                    public void onNext(MyResult result) {
                        //onSuccess
                    }
                });
mCompositeSubscription.add(subscription);

你可以检查这个 repo 的 RxJava + Retrofit + Realm https://github.com/savepopulation/wikilight

祝你好运!

而不是在 mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread()) 使用 subscribeOn 就像说:

您可以使用 Observable.defer 例如:

class RealmDataSource{
fun find(id: String): Observable<MyResult> {
// Default pattern for loading data on a background thread
return Observable.defer{
                val realm = Realm.getInstance()

                val query = realm
                    .where(MyResult::class.java)

                val flowable =
                    if (realm.isAutoRefresh) {
                        query
                            .findAllAsync()
                            .asFlowable()
                            .filter(RealmResults::isLoaded)
                    } else {
                        Flowable.just(query.findAll())
                    }

                return@defer flowable
                    .toObservable()
            }
}

然后使用将没有subscribeOn

// Uses a Realm
Observable<MyResult> realmObservable = mRealmDataSource.find(tId);

// Uses Retrofit
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);

有关详细信息,请参阅 https://realm.io/blog/realm-java-0-87-0/