Schedulers.io() 没有返回到主线程
Schedulers.io() not returning to main thread
我正在使用 RxParse 来解析查询的异步加载,但是当我使用 subscribeOn(Schedulers.io()) 订阅我的可观察对象时,我的 onCompleted 方法从未在主线程上被调用。取而代之的是,我的 onCompleted 方法在工作线程池内部被调用。如果我使用 observeOn(AndroidSchedulers.mainThread) 一切都会正常工作,但我的 onNextMethod 也会在主线程上调用,我不想要它。
我的代码有问题吗?
我的代码有什么问题吗?
ParseObservable.find(myQuery)
.map(myMapFunc())
.subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
.subscribe(
new Subscriber<MyObj>() {
@Override
public void onError(Throwable e) {
Log.e("error","error",e);
}
@Override
public void onNext(T t) {
// ... worker thread (but here is ok)
}
public void onCompleted() {
// ... worker thread again instead of mainThread
}
}
)
);
首先你要明白subscribeOn()
和observeOn()
的区别。这是两个完全不同的运算符,它们影响 Rx 链的不同部分。
subscribeOn()
指定 Observable 将在哪里工作。它不会影响 onNext()
、onError()
和 onComplete()
的执行位置。
observeOn()
指定执行回调(例如 onNext()
)的位置。它不会影响您的 Observable 在哪里工作。
所有回调将发生在同一个线程上。您不能通过任何 RxJava API 指定某些回调发生在一个线程上,而某些回调发生在另一个线程上。如果那是您想要的行为,您将必须在回调中自己实现它。
不幸的是,所有方法的订阅都在同一个线程中(onNext
、onError
和 onCompleted
但是您可以在 Schedulers.io()
和 onNext(T t)
方法中观察,创建一个新的 Observable
以在 MainThread
中监听,如下所示:
ParseObservable.find(myQuery)
.map(myMapFunc())
.subscribeOn(Schedulers.io())
.subscribe(
new Subscriber<MyObj>() {
@Override
public void onError(Throwable e) {
Log.e("error","error",e);
}
@Override
public void onNext(T t) {
Observable.just(t)
.observeOn(AndroidSchedulers.mainThread())
.subscribe((t) -> {
// do something in MainThread
})
}
public void onCompleted() {
// ... worker thread again instead of mainThread
}
}
)
);
希望对您有所帮助!
在这种情况下,我建议使用 "side action" 运算符。在我看来,这是一个比使用嵌套可观察对象稍微优雅的解决方案:
ParseObservable.find(myQuery)
.map(myMapFunc())
.subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
.doOnCompleted(() -> onCompleteAction())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(value -> onNext(value))
.subscribe();
不建议在订阅中订阅。
subscribeOn
决定当观察者订阅时 Observable 链开始的位置。
observeOn
可以在整个可观察链的不同点使用(如果需要,可以多次使用)以 在线程之间传递控制权 。 (您可以通过检查您是否在每个块中的主线程上来验证这一点)。
ParseObservable.find(myQuery)
.map(myMapFunc())
// Added this:
.doOnNext(obj -> {
// NOTE: This will happen on your `subscribeOn` scheduler
// Do something with `obj` here while on worker thread
}
.subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
// Added this:
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<>() {
next -> {
// NOTE: This will happen on the main thread
},
error -> {
Log.e("error","error",e);
// NOTE: This will happen on the main thread
},
() -> {
// NOTE: This will happen on the main thread
}
});
我正在使用 RxParse 来解析查询的异步加载,但是当我使用 subscribeOn(Schedulers.io()) 订阅我的可观察对象时,我的 onCompleted 方法从未在主线程上被调用。取而代之的是,我的 onCompleted 方法在工作线程池内部被调用。如果我使用 observeOn(AndroidSchedulers.mainThread) 一切都会正常工作,但我的 onNextMethod 也会在主线程上调用,我不想要它。
我的代码有问题吗?
我的代码有什么问题吗?
ParseObservable.find(myQuery)
.map(myMapFunc())
.subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
.subscribe(
new Subscriber<MyObj>() {
@Override
public void onError(Throwable e) {
Log.e("error","error",e);
}
@Override
public void onNext(T t) {
// ... worker thread (but here is ok)
}
public void onCompleted() {
// ... worker thread again instead of mainThread
}
}
)
);
首先你要明白subscribeOn()
和observeOn()
的区别。这是两个完全不同的运算符,它们影响 Rx 链的不同部分。
subscribeOn()
指定 Observable 将在哪里工作。它不会影响 onNext()
、onError()
和 onComplete()
的执行位置。
observeOn()
指定执行回调(例如 onNext()
)的位置。它不会影响您的 Observable 在哪里工作。
所有回调将发生在同一个线程上。您不能通过任何 RxJava API 指定某些回调发生在一个线程上,而某些回调发生在另一个线程上。如果那是您想要的行为,您将必须在回调中自己实现它。
不幸的是,所有方法的订阅都在同一个线程中(onNext
、onError
和 onCompleted
但是您可以在 Schedulers.io()
和 onNext(T t)
方法中观察,创建一个新的 Observable
以在 MainThread
中监听,如下所示:
ParseObservable.find(myQuery)
.map(myMapFunc())
.subscribeOn(Schedulers.io())
.subscribe(
new Subscriber<MyObj>() {
@Override
public void onError(Throwable e) {
Log.e("error","error",e);
}
@Override
public void onNext(T t) {
Observable.just(t)
.observeOn(AndroidSchedulers.mainThread())
.subscribe((t) -> {
// do something in MainThread
})
}
public void onCompleted() {
// ... worker thread again instead of mainThread
}
}
)
);
希望对您有所帮助!
在这种情况下,我建议使用 "side action" 运算符。在我看来,这是一个比使用嵌套可观察对象稍微优雅的解决方案:
ParseObservable.find(myQuery)
.map(myMapFunc())
.subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
.doOnCompleted(() -> onCompleteAction())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(value -> onNext(value))
.subscribe();
不建议在订阅中订阅。
subscribeOn
决定当观察者订阅时 Observable 链开始的位置。
observeOn
可以在整个可观察链的不同点使用(如果需要,可以多次使用)以 在线程之间传递控制权 。 (您可以通过检查您是否在每个块中的主线程上来验证这一点)。
ParseObservable.find(myQuery)
.map(myMapFunc())
// Added this:
.doOnNext(obj -> {
// NOTE: This will happen on your `subscribeOn` scheduler
// Do something with `obj` here while on worker thread
}
.subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
// Added this:
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<>() {
next -> {
// NOTE: This will happen on the main thread
},
error -> {
Log.e("error","error",e);
// NOTE: This will happen on the main thread
},
() -> {
// NOTE: This will happen on the main thread
}
});