Realm + Retrofit + RxJava:Concat 和 SubscribeOn
Realm + Retrofit + RxJava: Concat and SubscribeOn
我在使用 RxJava concat 运算符时遇到问题。我有两个可观察对象,第一个从服务器数据库发出结果,另一个从本地数据库发出结果,然后我连接 :
// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);
// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);
Observable.concat(localObservable, remoteObservable)
.doOnNext(result -> /* Do my stuff */)
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> throwable.printStackTrace())
.subscribe()
所以这导致了我的问题,因为我没有使用 subscribeOn()
连接的可观察对象是 运行ning 在 AndroidScheduler.MainThread()
上,这不是 运行 远程和它启动 NetworkOnMainThreadException
.
如果我实现 subscribeOn(Schedulers.computation())
我会得到 Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created
因为当然 Observable 不在线程上 运行 领域实例确实存在。
我搜索了其他问题,但没有得到任何有用的信息,我检查了 realm: https://github.com/realm/realm-java/blob/master/examples/rxJavaExample/src/main/java/io/realm/examples/rxjava/retrofit/RetrofitExample.java 制作的示例,但奇怪的是,我看到改装 observable 没有订阅任何内容并且它有效。
为什么它适用于示例,而在我的代码中我不能这样做?有什么建议吗?
我相信你应该在正确的地方使用 subscribeOn()
。
// Uses a Realm in the UI thread
Observable<MyResult> realmObservable = mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread());
// Uses Retrofit
Observable<MyResult> retrofitObservable = mRetrofitDataSource.find(tId).subscribeOn(Subscribers.io());
Observable.concat(realmObservable, retrofitObservable)
.doOnNext(result -> /* Do my stuff */)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> throwable.printStackTrace())
.subscribe()
看看它是否解决了您的问题。
您可以像下面这样连接本地和远程可观察对象:
// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);
// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);
Observable.concat(localObservable, remoteObservable).first()
.map(new Func1<MyResult, MyResult>() {
@Override
public myResult call(MyResult result) {
if (result == null) {
throw new IllegalArgumentException();
}
return result;
}
});
并像下面这样订阅:
CompositeSubscription mCompositeSubscription = new CompositeSubscription();
final Subscription subscription = mRepo.find(tId
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<MyResult>() {
@Override
public void onCompleted() {
// Completed
}
@Override
public void onError(Throwable e) {
// onError
}
@Override
public void onNext(MyResult result) {
//onSuccess
}
});
mCompositeSubscription.add(subscription);
你可以检查这个 repo 的 RxJava + Retrofit + Realm
https://github.com/savepopulation/wikilight
祝你好运!
而不是在 mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread())
使用 subscribeOn
就像说:
您可以使用 Observable.defer
例如:
class RealmDataSource{
fun find(id: String): Observable<MyResult> {
// Default pattern for loading data on a background thread
return Observable.defer{
val realm = Realm.getInstance()
val query = realm
.where(MyResult::class.java)
val flowable =
if (realm.isAutoRefresh) {
query
.findAllAsync()
.asFlowable()
.filter(RealmResults::isLoaded)
} else {
Flowable.just(query.findAll())
}
return@defer flowable
.toObservable()
}
}
然后使用将没有subscribeOn
// Uses a Realm
Observable<MyResult> realmObservable = mRealmDataSource.find(tId);
// Uses Retrofit
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);
有关详细信息,请参阅 https://realm.io/blog/realm-java-0-87-0/
我在使用 RxJava concat 运算符时遇到问题。我有两个可观察对象,第一个从服务器数据库发出结果,另一个从本地数据库发出结果,然后我连接 :
// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);
// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);
Observable.concat(localObservable, remoteObservable)
.doOnNext(result -> /* Do my stuff */)
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> throwable.printStackTrace())
.subscribe()
所以这导致了我的问题,因为我没有使用 subscribeOn()
连接的可观察对象是 运行ning 在 AndroidScheduler.MainThread()
上,这不是 运行 远程和它启动 NetworkOnMainThreadException
.
如果我实现 subscribeOn(Schedulers.computation())
我会得到 Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created
因为当然 Observable 不在线程上 运行 领域实例确实存在。
我搜索了其他问题,但没有得到任何有用的信息,我检查了 realm: https://github.com/realm/realm-java/blob/master/examples/rxJavaExample/src/main/java/io/realm/examples/rxjava/retrofit/RetrofitExample.java 制作的示例,但奇怪的是,我看到改装 observable 没有订阅任何内容并且它有效。
为什么它适用于示例,而在我的代码中我不能这样做?有什么建议吗?
我相信你应该在正确的地方使用 subscribeOn()
。
// Uses a Realm in the UI thread
Observable<MyResult> realmObservable = mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread());
// Uses Retrofit
Observable<MyResult> retrofitObservable = mRetrofitDataSource.find(tId).subscribeOn(Subscribers.io());
Observable.concat(realmObservable, retrofitObservable)
.doOnNext(result -> /* Do my stuff */)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> throwable.printStackTrace())
.subscribe()
看看它是否解决了您的问题。
您可以像下面这样连接本地和远程可观察对象:
// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);
// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);
Observable.concat(localObservable, remoteObservable).first()
.map(new Func1<MyResult, MyResult>() {
@Override
public myResult call(MyResult result) {
if (result == null) {
throw new IllegalArgumentException();
}
return result;
}
});
并像下面这样订阅:
CompositeSubscription mCompositeSubscription = new CompositeSubscription();
final Subscription subscription = mRepo.find(tId
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<MyResult>() {
@Override
public void onCompleted() {
// Completed
}
@Override
public void onError(Throwable e) {
// onError
}
@Override
public void onNext(MyResult result) {
//onSuccess
}
});
mCompositeSubscription.add(subscription);
你可以检查这个 repo 的 RxJava + Retrofit + Realm https://github.com/savepopulation/wikilight
祝你好运!
而不是在 mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread())
使用 subscribeOn
就像说:
您可以使用 Observable.defer
例如:
class RealmDataSource{
fun find(id: String): Observable<MyResult> {
// Default pattern for loading data on a background thread
return Observable.defer{
val realm = Realm.getInstance()
val query = realm
.where(MyResult::class.java)
val flowable =
if (realm.isAutoRefresh) {
query
.findAllAsync()
.asFlowable()
.filter(RealmResults::isLoaded)
} else {
Flowable.just(query.findAll())
}
return@defer flowable
.toObservable()
}
}
然后使用将没有subscribeOn
// Uses a Realm
Observable<MyResult> realmObservable = mRealmDataSource.find(tId);
// Uses Retrofit
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);
有关详细信息,请参阅 https://realm.io/blog/realm-java-0-87-0/