意外线程上的 RxAndroid Observable 运行
RxAndroid Observable running on unexpected thread
我正在尝试创建一个 Observable
以便它会每隔一段时间从网络加载一些数据,每当用户刷新页面时。这是我到目前为止的要点:
PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.flatMap(t -> {
// network operations that eventually return a value
// these operations are not observables themselves
// they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// update ui with data
}, error -> {
// do something with error
});
稍后在刷新回调中我有:
refreshSubject.onNext(0L);
它在间隔上正常运行,但是当我刷新时,它会爆炸 NetworkOnMainThreadException
。我以为我用 subscribeOn
/observeOn
处理了这个问题。我错过了什么?另外,为什么从间隔触发 Observer
时不会导致崩溃?
您必须将 subscribeOn(Schedulers.io())
更改为 observeOn(Schedulers.io())
并将其移动到您的平面地图上。
这样做的原因是你的refreshSubject
是一个PublishSubject,它是一个Observable,一个Observer。
因为这个 PublishSubject 的 onNext()
在结果被传送到你的订阅之前首先在实习生 Observable 内部被调用。
这也是它在您仅使用 Observable 时起作用的原因(以及 interval
默认情况下始终订阅计算线程的事实)。
只需检查这两个片段的输出:
Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.observeOn(Schedulers.io())
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
// do something with error
});
对
Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
// do something with error
});
我正在尝试创建一个 Observable
以便它会每隔一段时间从网络加载一些数据,每当用户刷新页面时。这是我到目前为止的要点:
PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.flatMap(t -> {
// network operations that eventually return a value
// these operations are not observables themselves
// they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// update ui with data
}, error -> {
// do something with error
});
稍后在刷新回调中我有:
refreshSubject.onNext(0L);
它在间隔上正常运行,但是当我刷新时,它会爆炸 NetworkOnMainThreadException
。我以为我用 subscribeOn
/observeOn
处理了这个问题。我错过了什么?另外,为什么从间隔触发 Observer
时不会导致崩溃?
您必须将 subscribeOn(Schedulers.io())
更改为 observeOn(Schedulers.io())
并将其移动到您的平面地图上。
这样做的原因是你的refreshSubject
是一个PublishSubject,它是一个Observable,一个Observer。
因为这个 PublishSubject 的 onNext()
在结果被传送到你的订阅之前首先在实习生 Observable 内部被调用。
这也是它在您仅使用 Observable 时起作用的原因(以及 interval
默认情况下始终订阅计算线程的事实)。
只需检查这两个片段的输出:
Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.observeOn(Schedulers.io())
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
// do something with error
});
对
Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
// do something with error
});