Retrofit 中的 Rxjava observeOn 和 subscribeOn
Rxjava observeOn and subscribeOn in Retrofit
观察:
这个方法简单地改变了下游所有操作员的线程
(https://medium.com/upday-devs/rxjava-subscribeon-vs-observeon-9af518ded53a)
调用 API 时,我想 运行 在 IO 线程上与服务器通信,并希望在 mainThread 上处理结果。
我在很多教程中看到下面的代码,毫无疑问它是正确的。
但是我的理解是相反的所以我想知道我的误解是什么
requestInterface.callApi()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe())
observeOn(AndroidSchedulers.mainThread())
:observeOn将所有算子的线程更往下游改变,但例子中,实际调用API函数比observeOn?
更上层
.subscribeOn(Schedulers.io())
:奇怪的部分,它需要在主线程上订阅,但是在IO线程上订阅?
请指教 我哪里误会了?
基本,我们会有
Observable.subscribe(Observer);// => Observer observe Observable and Observable subscribe Observer
示例
requestInterface.callApi().subscribe(new Observer...); // requestInterface.callApi() <=> Observable
来自http://reactivex.io/documentation/operators/subscribeon.html
订阅
- SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called
ObserveOn(影响 2 件事)
It instructs the Observable to send notifications to Observers on a specified Scheduler.
ObserveOn affects the thread that the Observable will use below where that operator appears
例子
registerUserReturnedObserverble() // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.andThen(loginReturnObserverble()) // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.observeOn(AndroidSchedulers.mainThread())
.andThen(getUserDataReturnObserverble()) // run on main thread because .observeOn(AndroidSchedulers.mainThread()) is above this operator (line 3)
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>{
// run on main thread because observeOn(AndroidSchedulers.mainThread())
});
这是一个示例:
getCardsObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new rx.Observer<List<Card>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
listener.onError(e.getMessage());
}
@Override
public void onNext(List<Card> cards) {
listener.onSuccess(cards);
}
});
订阅 -->
执行调用的线程类似于调用 asynctask
observeOn --> 将在 UI 线程
的进程中观察响应
订阅 --> 观察者回调
- subscribeOn(Schedulers.io()):这告诉 Observable 运行 后台线程上的任务
- observeOn(AndroidSchedulers.mainThread()):这告诉 Observer 在 android UI 线程上接收数据,以便您可以采取任何 UI 相关行动。
以下情况指定了使用 observeOn() and/Or subscribeOn() 时可能出现的所有不同情况。
subscribeOn
影响 upstream 运营商(subscribeOn 之上的运营商)
observeOn
影响下游运营商(observeOn以下的运营商)
- 如果你在RxJava中没有指定线程(如果你没有指定
subscribeOn
、observeOn
或两者),数据将由当前[=82=发送和处理](通常是主线程)。例如,下面链中的所有运算符都将由当前线程处理(如果是 Android,则为主线程)。
Observable
.just("big", "bigger", "biggest")
.map(String::length)
.filter { it > 6 }
.subscribe { length -> println("item length $length") }
如果只指定subscribeOn,所有算子都会在那个线程上执行
在此处输入代码
Observable
.just("big", "bigger", "biggest")
.subscribeOn(Schedulers.io())
.map(String::length)
.filter { it > 6 }
.subscribe { length -> println("item length $length") }
数据发射,map 和 filter 运算符将按照上游运算符的指示在 io 调度程序上执行 subscribeOn
。
- 如果只指定
observeOn
,则所有算子都在当前线程执行,只有observeOn
以下的算子才会切换到observeOn
[=70]指定的线程=]
Observable
.just("big", "bigger", "biggest")
.map(String::length)
.observeOn(Schedulers.computation())
.filter { it > 6 }
.subscribe { length -> println("item length $length") }
数据发射,地图将在当前线程调度程序上执行。
过滤器将按照下游操作员的指示在计算调度程序上执行 observeOn
。
- 如果同时指定了
subscribeOn
和observeOn
,则observeOn
以下的所有算子都会切换到observeOn
指定的线程,其余所有算子在上面observeOn
切换到 subscribeOn
指定的线程。这适用于您指定 subscribeOn
和 observeOn
的任何顺序
Observable
.just("big", "bigger", "biggest")
.subscribeOn(Schedulers.io())
.map(String::length)
.observeOn(Schedulers.computation())
.filter { it > 6 }
.subscribe { length -> println("item length $length") }
数据发射和映射运算符将按照上游运算符的指示在 io 调度程序上执行 subscribeOn
。
过滤器将按照下游操作员的指示在计算调度程序上执行 observeOn
。
即使在 observeOn 之后调用 subscribeOn,线程使用也是一样的。
Observable
.just("big", "bigger", "biggest")
.map(String::length)
.observeOn(Schedulers.computation())
.filter { it > 6 }
.subscribeOn(Schedulers.io())
.subscribe { length -> println("item length $length") }
观察: 这个方法简单地改变了下游所有操作员的线程 (https://medium.com/upday-devs/rxjava-subscribeon-vs-observeon-9af518ded53a)
调用 API 时,我想 运行 在 IO 线程上与服务器通信,并希望在 mainThread 上处理结果。
我在很多教程中看到下面的代码,毫无疑问它是正确的。 但是我的理解是相反的所以我想知道我的误解是什么
requestInterface.callApi()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe())
observeOn(AndroidSchedulers.mainThread())
:observeOn将所有算子的线程更往下游改变,但例子中,实际调用API函数比observeOn?
更上层.subscribeOn(Schedulers.io())
:奇怪的部分,它需要在主线程上订阅,但是在IO线程上订阅?
请指教 我哪里误会了?
基本,我们会有
Observable.subscribe(Observer);// => Observer observe Observable and Observable subscribe Observer
示例
requestInterface.callApi().subscribe(new Observer...); // requestInterface.callApi() <=> Observable
来自http://reactivex.io/documentation/operators/subscribeon.html
订阅
- SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called
ObserveOn(影响 2 件事)
It instructs the Observable to send notifications to Observers on a specified Scheduler.
ObserveOn affects the thread that the Observable will use below where that operator appears
例子
registerUserReturnedObserverble() // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.andThen(loginReturnObserverble()) // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.observeOn(AndroidSchedulers.mainThread())
.andThen(getUserDataReturnObserverble()) // run on main thread because .observeOn(AndroidSchedulers.mainThread()) is above this operator (line 3)
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>{
// run on main thread because observeOn(AndroidSchedulers.mainThread())
});
这是一个示例:
getCardsObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new rx.Observer<List<Card>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
listener.onError(e.getMessage());
}
@Override
public void onNext(List<Card> cards) {
listener.onSuccess(cards);
}
});
订阅 --> 执行调用的线程类似于调用 asynctask
observeOn --> 将在 UI 线程
的进程中观察响应订阅 --> 观察者回调
- subscribeOn(Schedulers.io()):这告诉 Observable 运行 后台线程上的任务
- observeOn(AndroidSchedulers.mainThread()):这告诉 Observer 在 android UI 线程上接收数据,以便您可以采取任何 UI 相关行动。
以下情况指定了使用 observeOn() and/Or subscribeOn() 时可能出现的所有不同情况。
subscribeOn
影响 upstream 运营商(subscribeOn 之上的运营商)observeOn
影响下游运营商(observeOn以下的运营商)- 如果你在RxJava中没有指定线程(如果你没有指定
subscribeOn
、observeOn
或两者),数据将由当前[=82=发送和处理](通常是主线程)。例如,下面链中的所有运算符都将由当前线程处理(如果是 Android,则为主线程)。
Observable .just("big", "bigger", "biggest") .map(String::length) .filter { it > 6 } .subscribe { length -> println("item length $length") }
如果只指定subscribeOn,所有算子都会在那个线程上执行
在此处输入代码
Observable .just("big", "bigger", "biggest") .subscribeOn(Schedulers.io()) .map(String::length) .filter { it > 6 } .subscribe { length -> println("item length $length") }
数据发射,map 和 filter 运算符将按照上游运算符的指示在 io 调度程序上执行 subscribeOn
。
- 如果只指定
observeOn
,则所有算子都在当前线程执行,只有observeOn
以下的算子才会切换到observeOn
[=70]指定的线程=]
Observable .just("big", "bigger", "biggest") .map(String::length) .observeOn(Schedulers.computation()) .filter { it > 6 } .subscribe { length -> println("item length $length") }
数据发射,地图将在当前线程调度程序上执行。
过滤器将按照下游操作员的指示在计算调度程序上执行 observeOn
。
- 如果同时指定了
subscribeOn
和observeOn
,则observeOn
以下的所有算子都会切换到observeOn
指定的线程,其余所有算子在上面observeOn
切换到subscribeOn
指定的线程。这适用于您指定subscribeOn
和observeOn
的任何顺序
Observable .just("big", "bigger", "biggest") .subscribeOn(Schedulers.io()) .map(String::length) .observeOn(Schedulers.computation()) .filter { it > 6 } .subscribe { length -> println("item length $length") }
数据发射和映射运算符将按照上游运算符的指示在 io 调度程序上执行 subscribeOn
。
过滤器将按照下游操作员的指示在计算调度程序上执行 observeOn
。
即使在 observeOn 之后调用 subscribeOn,线程使用也是一样的。
Observable .just("big", "bigger", "biggest") .map(String::length) .observeOn(Schedulers.computation()) .filter { it > 6 } .subscribeOn(Schedulers.io()) .subscribe { length -> println("item length $length") }