Rxandroid SubscribeOn 和 ObserveOn 有什么区别

Rxandroid What's the difference between SubscribeOn and ObserveOn

我刚刚在学习 Rx-java 和 Rxandroid2,我很困惑 SubscribeOn 和 ObserveOn 之间的主要区别是什么。

SubscribeOn 指定 Observable 将在其上运行的调度程序。 ObserveOn 指定观察者将观察此 Observable 的调度程序。

所以基本上 SubscribeOn 主要是在后台线程上订阅(执行)(你不想在等待可观察对象时阻塞 UI 线程)并且在 ObserveOn 中你想观察结果主线程...

如果您熟悉 AsyncTask,那么 SubscribeOn 类似于 doInBackground 方法,ObserveOn 类似于 onPostExecute...

如果您发现 充满行话:

tl;dr

 Observable.just("Some string")                 
           .map(str -> str.length())              
           .observeOn(Schedulers.computation())   
           .map(length -> 2 * length)   
           .observeOn(AndroidSchedulers.mainThread())
           .subscribeOn(Schedulers.io())
           .subscribe(---)

Observe an observable... perform the map function in an IO thread (since we are "subscribingOn" that thread)... now switch to a Computation Thread and perform map(length -> 2 * length) function... and finally make sure you Observe the output on (observeOn()) Main thread.

无论如何,

observeOn()只是将所有操作员的线程进一步下游。人们通常有这样一种 误解 ,认为 observeOn 也是 上游 ,但事实并非如此。

下面的例子会更好地解释...

Observable.just("Some string")                  // UI
       .map(str -> str.length())               // UI
       .observeOn(Schedulers.computation())   // Changing the thread
       .map(length -> 2 * length)            // Computation
       .subscribe(---)

subscribeOn()影响 当订阅 Observable 时将要使用的线程,它将留在下游。

Observable.just("Some String")              // Computation
  .map(str -> str.length())                // Computation
  .map(length -> 2 * length)              // Computation
  .subscribeOn(Schedulers.computation()) // -- changing the thread
  .subscribe(number -> Log.d("", "Number " + number));// Computation

Position does not matter (subscribeOn())

为什么? 因为只影响订阅时间

Methods that obey the contact with subscribeOn

-> 基本示例:Observable.create

create 体内指定的所有工作将 运行 在 subscribeOn 中指定的线程上。

另一个例子:Observable.justObservable.fromObservable.range

注意:所有这些方法都接受值,所以不要使用阻塞方法来创建这些值,因为 subscribeOn 不会影响它。

如果要使用阻塞函数,请使用

Observable.defer(() -> Obervable.just(blockingMenthod())));

Important Fact:

subscribeOn 不适用于 Subjects

Multiple subscribeOn:

如果流中有多个subscribeOn实例,只有第一个有实际效果。

Subscribe & subscribeOn

人们认为subscribeOnObservable.subscribe有关系,但并没有什么特别的关系。 只影响订阅阶段

资料来源:Tomek Polański (Medium)

总结

  • 使用 observeOn 回调 设置线程“进一步向下(在它下面)”,例如 doOnNext 或 [= 中的代码块16=].
  • 使用subscribeOn初始化设置线程“上游(在它之上)”,例如doOnSubscribeObservable.justObservable.create.
  • 两种方法都可以调用多次,每次调用都会覆盖之前的调用。 位置很重要。

让我们通过一个例子来了解这个主题:我们想要找到字符串“user1032613”的长度。这对计算机来说不是一件容易的事,所以我们很自然地在后台线程中执行密集计算,以避免冻结应用程序。

观察

我们可以根据需要多次调用 observeOn,它控制哪个线程在它下面回调 运行。它简单易用,效果如您所愿。

比如我们会在主UI线程显示一个进度条,然后在另一个线程做intensive/blocking操作,然后回到主UI线程更新结果:

    Observable.just("user1032613")

            .observeOn(mainThread) // set thread for operation 1
            .doOnNext {
                /* operation 1 */
                print("display progress bar")
                progressBar.visibility = View.VISIBLE
            }

            .observeOn(backThread) // set thread for operation 2 and 3
            .map {
                /* operation 2 */
                print("calculating")
                Thread.sleep(5000)
                it.length
            }

            .doOnNext {
                /* operation 3 */
                print("finished calculating")
            }

            .observeOn(mainThread) // set thread for operation 4
            .doOnNext {
                /* operation 4 */
                print("hide progress bar and display result")
                progressBar.visibility = View.GONE
                resultTextView.text = "There're $it characters!"
            }

            .subscribe()

在上面的例子中,/* operation 1 */mainThread中是运行,因为我们在它正上方的一行中使用observeOn(mainThread)设置它;然后我们通过再次调用 observeOn 切换到 backThread,因此 /* operation 2 */ 将在那里 运行。因为我们在链接 /* operation 3 */ 之前没有更改它,所以它也会在后台线程中 运行 ,就像 /* operation 2 */ 一样;最后我们再次调用 observeOn(mainThread),以确保 /* operation 4 */ 从主线程更新 UI。

订阅

所以我们了解到 observeOn 为后续回调设置线程。我们还缺少什么?那么,Observable本身,以及它的just()create()subscribe()等方法,也是需要执行的代码。这就是对象沿流传递的方式。我们使用 subscribeOn 为与 Observable 本身相关的代码设置线程。

如果我们删除所有回调(由前面讨论的 observeOn 控制),我们将留下“骨架代码”,默认情况下,运行 在代码编写的任何线程上在(可能是主线程):

    Observable.just("user1032613")
            .observeOn(mainThread)
            .doOnNext {
            }
            .observeOn(backThread)
            .map {
            }
            .doOnNext {
            }
            .observeOn(mainThread)
            .doOnNext {
            }
            .subscribe()

如果我们对主线程上的空框架代码 运行 不满意,我们可以使用 subscribeOn 来更改它。例如,也许第一行 Observable.just("user1032613") 并不像从我的用户名创建流那么简单 - 也许它是来自 Internet 的字符串,或者您可能正在使用 doOnSubscribe 进行其他一些密集型操作。在这种情况下,您可以调用 subscribeOn(backThread) 将一些代码放在另一个线程中。

放在哪里subscribeOn

在撰写此答案时,存在一些误解,如“只调用一次”、“位置无关紧要”和“如果调用多次,则只有第一次有效”。经过大量研究和实验,事实证明 subscribeOn 可以多次调用。

因为 Observable 使用 Builder Pattern(“一个接一个地链接方法”的奇特名称),所以 subscribeOn 以相反的顺序应用。因此,此方法为 上面的代码 设置线程,与 observeOn.

正好相反

我们可以使用 doOnSubscribe 方法进行实验。此方法在订阅事件上触发,它 运行s 在由 subscribeOn:

设置的线程上
    Observable.just("user1032613")
            .doOnSubscribe {
                print("#3 running on main thread")
            }
            .subscribeOn(mainThread) // set thread for #3 and just()
            .doOnNext {
            }
            .map {
            }
            .doOnSubscribe {
                print("#2 running on back thread")
            }
            .doOnNext {
            }
            .subscribeOn(backThread) // set thread for #2 above
            .doOnNext {
            }
            .doOnSubscribe {
                print("#1 running on default thread")
            }
            .subscribe()

如果您从下到上阅读上面的示例,可能更容易理解逻辑,就像构建器模式执行代码的方式一样。

在此示例中,第一行 Observable.just("user1032613")print("#3") 在同一线程中 运行,因为它们之间没有更多的 subscribeOn。对于那些只关心 just()create() 中的代码的人来说,这会造成“只有第一次调用很重要”的错觉。这个quickly falls apart once you start doing more.


脚注:

示例中的线程和print()函数定义如下,为简洁起见:

val mainThread = AndroidSchedulers.mainThread()
val backThread = Schedulers.computation()
private fun print(msg: String) = Log.i("", "${Thread.currentThread().name}: $msg")

如果有人觉得 rx java 描述难以理解(比如我),这里是纯粹的 java 解释:

subscribeOn()

Observable.just("something")
  .subscribeOn(Schedulers.newThread())
  .subscribe(...);

相当于:

Observable observable = Observable.just("something");
new Thread(() -> observable.subscribe(...)).start();

因为 Observablesubscribe() 上发出值,而这里 subscribe() 进入单独的线程,这些值也与 subscribe() 在同一线程中发出。这就是它工作的原因 "upstream" (影响之前操作的线程)和 "downstream".

observeOn()

Observable.just("something")
  .observeOn(Schedulers.newThread())
  .subscribe(...);

相当于:

Observable observable = Observable.just("something")
  .subscribe(it -> new Thread(() -> ...).start());

这里Observable在主线程中发出值,只有监听器方法在单独的线程中执行。

这个回答不是什么新鲜事,我只是想再澄清一点。

  1. 假设我们有两个线程。

     val pool1 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 1") }
     val pool2 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 2") }
    

  1. 如答案所述,observeOn 将设置 Downstream,而 subscribeOn 将设置 Upstream。但是如果两者都用呢?为了检查这一点,我逐行添加了日志。

    Observable.just("what if use both")
     .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe A " + Thread.currentThread().name) }
     .doOnNext { Log.d("Thread", "both, doOnNext A " + Thread.currentThread().name) }
     .map {
         Log.d("Thread", "both, map A " + Thread.currentThread().name)
         it + " A"
     }
    
     // observeOn
     .observeOn(Schedulers.from(pool1)) 
    
     .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe B " + Thread.currentThread().name) }
     .doOnNext { Log.d("Thread", "both, doOnNext B " + Thread.currentThread().name) }
     .map {
         Log.d("Thread", "both, map B " + Thread.currentThread().name)
         it + " B"
     }
    
     // subscribeOn
     .subscribeOn(Schedulers.from(pool2)) 
    
     .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe C " + Thread.currentThread().name) }
     .doOnNext { Log.d("Thread", "both, doOnNext C " + Thread.currentThread().name) }
     .map {
        Log.d("Thread", "both, map C " + Thread.currentThread().name)
        it + " C"
      }
    
     // observeOn main
     .observeOn(AndroidSchedulers.mainThread())
     .doOnNext { Log.d("Thread", "main " + Thread.currentThread().name) }
     .subscribe(
         { result -> Log.d("Thread", "main subscribe " + Thread.currentThread().name)}
         , { error -> {} }
         )
    

结果是:

both, doOnSubscribe C main
both, doOnSubscribe A Thread 2
both, doOnSubscribe B Thread 2

both, doOnNext A Thread 2
both, map A Thread 2

both, doOnNext B Thread 1
both, map B Thread 1

both, doOnNext C Thread 1
both, map C Thread 1

main main
main subscribe main
result: what if use both A B C

如您所见,doOnSubscribe 首先被调用,从下到上。这意味着 subscribe 优先于其他运算符,因此处理第一个代码的第一个线程是 线程 2.

然后一行一行地调用了其他算子。在 observeOn 之后,线程更改为 Thread 1。然后,就在调用 subscribe 之前,再次调用 observeOn 以将线程更改为主线程。 (不用管AndroidSchedulers,它只是一种调度器)

TL;DR;

  • 第一条路径,subscribeOn 首先调用,从下到上。
  • 第二条路径,observeOn 调用,从上到下,以及其他代码。
  • RxJava2 和 RxJava3 上的行为相同

当您订阅一个 observable 时,一个流程开始工作,它一直向上到链的顶部,然后再次向下。订阅部分与向上链接相关,观察部分与向下链接相关。

一旦到达链顶,订阅阶段就基本完成了。事件开始发出,映射、过滤器等的向下链被调用。

SubscribeOn 影响 subscription 调用,例如 doOnSubscribe。

ObserveOn 影响 observation 低于其位置的调用,例如 doOnNext、map、flatmap 等

两者都会改变用于继续向上或向下流动的线程。

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;

public class SubscribeVsObserveOn {

    public static void main(String[] args) throws InterruptedException {

        System.out.println("Ordinal 0: " + Thread.currentThread().getName());

        final CountDownLatch latch = new CountDownLatch(1);

        Observable
            .just("a regular string.")
            .doOnSubscribe(disposable ->
                System.out.println("Ordinal 2: " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .doOnNext(s ->
                System.out.println("Ordinal 3: " + Thread.currentThread().getName()))
            .map(s -> s)
            .doOnSubscribe(disposable ->
                System.out.println("Ordinal 1: " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .doOnNext(s ->
                System.out.println("Ordinal 4: " + Thread.currentThread().getName()))
            .map(s -> s)
            .subscribe(s -> latch.countDown());

        latch.await();
    }
}

这是输出:

Ordinal 0: main
Ordinal 1: RxNewThreadScheduler-1
Ordinal 2: RxNewThreadScheduler-2
Ordinal 3: RxNewThreadScheduler-3
Ordinal 4: RxNewThreadScheduler-4