Retrofit 中的 Rxjava observeOn 和 subscribeOn

Rxjava observeOn and subscribeOn in Retrofit

观察: 这个方法简单地改变了下游所有操作员的线程 (https://medium.com/upday-devs/rxjava-subscribeon-vs-observeon-9af518ded53a)

调用 API 时,我想 运行 在 IO 线程上与服务器通信,并希望在 mainThread 上处理结果。

我在很多教程中看到下面的代码,毫无疑问它是正确的。 但是我的理解是相反的所以我想知道我的误解是什么

requestInterface.callApi()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe())

observeOn(AndroidSchedulers.mainThread())

:observeOn将所有算子的线程更往下游改变,但例子中,实际调用API函数比observeOn?

更上层

.subscribeOn(Schedulers.io())

:奇怪的部分,它需要在主线程上订阅,但是在IO线程上订阅?

请指教 我哪里误会了?

基本,我们会有

Observable.subscribe(Observer);// => Observer observe Observable and Observable subscribe Observer

示例

requestInterface.callApi().subscribe(new Observer...); // requestInterface.callApi() <=> Observable

来自http://reactivex.io/documentation/operators/subscribeon.html

订阅

  • SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called

ObserveOn(影响 2 件事)

  • It instructs the Observable to send notifications to Observers on a specified Scheduler.

  • ObserveOn affects the thread that the Observable will use below where that operator appears

例子

registerUserReturnedObserverble()  // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.andThen(loginReturnObserverble()) // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.observeOn(AndroidSchedulers.mainThread())
.andThen(getUserDataReturnObserverble()) // run on main thread because .observeOn(AndroidSchedulers.mainThread()) is above this operator (line 3)
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>{
    // run on main thread because observeOn(AndroidSchedulers.mainThread()) 
});

这是一个示例:

      getCardsObservable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new rx.Observer<List<Card>>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    listener.onError(e.getMessage());
                }

                @Override
                public void onNext(List<Card> cards) {
                    listener.onSuccess(cards);
                }
            });

订阅 --> 执行调用的线程类似于调用 asynctask

observeOn --> 将在 UI 线程

的进程中观察响应

订阅 --> 观察者回调

  • subscribeOn(Schedulers.io()):这告诉 Observable 运行 后台线程上的任务
  • observeOn(AndroidSchedulers.mainThread()):这告诉 Observer 在 android UI 线程上接收数据,以便您可以采取任何 UI 相关行动。

以下情况指定了使用 observeOn() and/Or subscribeOn() 时可能出现的所有不同情况。

  1. subscribeOn 影响 upstream 运营商(subscribeOn 之上的运营商)
  2. observeOn影响下游运营商(observeOn以下的运营商)
  3. 如果你在RxJava中没有指定线程(如果你没有指定subscribeOnobserveOn或两者),数据将由当前[=82=发送和处理](通常是主线程)。例如,下面链中的所有运算符都将由当前线程处理(如果是 Android,则为主线程)。
Observable
.just("big", "bigger", "biggest")    
.map(String::length)    
.filter { it > 6 }    
.subscribe { length -> println("item length $length") }
  1. 如果只指定subscribeOn,所有算子都会在那个线程上执行

    在此处输入代码

Observable
.just("big", "bigger", "biggest") 
.subscribeOn(Schedulers.io())    
.map(String::length)    
.filter { it > 6 }    
.subscribe { length -> println("item length $length") }

数据发射,map 和 filter 运算符将按照上游运算符的指示在 io 调度程序上执行 subscribeOn

  1. 如果只指定observeOn,则所有算子都在当前线程执行,只有observeOn以下的算子才会切换到observeOn[=70]指定的线程=]
Observable
.just("big", "bigger", "biggest")   
.map(String::length)
.observeOn(Schedulers.computation())     
.filter { it > 6 }    
.subscribe { length -> println("item length $length") }

数据发射,地图将在当前线程调度程序上执行。

过滤器将按照下游操作员的指示在计算调度程序上执行 observeOn

  1. 如果同时指定了subscribeOnobserveOn,则observeOn以下的所有算子都会切换到observeOn指定的线程,其余所有算子在上面observeOn 切换到 subscribeOn 指定的线程。这适用于您指定 subscribeOnobserveOn
  2. 的任何顺序
Observable
.just("big", "bigger", "biggest")   
.subscribeOn(Schedulers.io()) 
.map(String::length)
.observeOn(Schedulers.computation())     
.filter { it > 6 }    
.subscribe { length -> println("item length $length") }

数据发射和映射运算符将按照上游运算符的指示在 io 调度程序上执行 subscribeOn

过滤器将按照下游操作员的指示在计算调度程序上执行 observeOn

即使在 observeOn 之后调用 subscribeOn,线程使用也是一样的。

Observable
.just("big", "bigger", "biggest")  
.map(String::length)
.observeOn(Schedulers.computation()) 
.filter { it > 6 }   
.subscribeOn(Schedulers.io()) 
.subscribe { length -> println("item length $length") }