Rxandroid SubscribeOn 和 ObserveOn 有什么区别
Rxandroid What's the difference between SubscribeOn and ObserveOn
我刚刚在学习 Rx-java 和 Rxandroid2,我很困惑 SubscribeOn 和 ObserveOn 之间的主要区别是什么。
SubscribeOn 指定 Observable 将在其上运行的调度程序。
ObserveOn 指定观察者将观察此 Observable 的调度程序。
所以基本上 SubscribeOn 主要是在后台线程上订阅(执行)(你不想在等待可观察对象时阻塞 UI 线程)并且在 ObserveOn 中你想观察结果主线程...
如果您熟悉 AsyncTask,那么 SubscribeOn 类似于 doInBackground 方法,ObserveOn 类似于 onPostExecute...
如果您发现 充满行话:
tl;dr
Observable.just("Some string")
.map(str -> str.length())
.observeOn(Schedulers.computation())
.map(length -> 2 * length)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(---)
Observe an observable... perform the map function in an IO thread (since we are "subscribingOn"
that thread)... now switch to a Computation Thread and perform map(length -> 2 * length)
function... and finally make sure you Observe the output on (observeOn()
) Main thread.
无论如何,
observeOn()
只是将所有操作员的线程进一步下游。人们通常有这样一种 误解 ,认为 observeOn
也是 上游 ,但事实并非如此。
下面的例子会更好地解释...
Observable.just("Some string") // UI
.map(str -> str.length()) // UI
.observeOn(Schedulers.computation()) // Changing the thread
.map(length -> 2 * length) // Computation
.subscribe(---)
subscribeOn()
仅影响 当订阅 Observable 时将要使用的线程,它将留在下游。
Observable.just("Some String") // Computation
.map(str -> str.length()) // Computation
.map(length -> 2 * length) // Computation
.subscribeOn(Schedulers.computation()) // -- changing the thread
.subscribe(number -> Log.d("", "Number " + number));// Computation
Position does not matter (subscribeOn()
)
为什么?
因为只影响订阅时间
Methods that obey the contact with subscribeOn
-> 基本示例:Observable.create
create
体内指定的所有工作将 运行 在 subscribeOn
中指定的线程上。
另一个例子:Observable.just
、Observable.from
或Observable.range
注意:所有这些方法都接受值,所以不要使用阻塞方法来创建这些值,因为 subscribeOn 不会影响它。
如果要使用阻塞函数,请使用
Observable.defer(() -> Obervable.just(blockingMenthod())));
Important Fact:
subscribeOn 不适用于 Subjects
Multiple subscribeOn
:
如果流中有多个subscribeOn
实例,只有第一个有实际效果。
Subscribe & subscribeOn
人们认为subscribeOn
与Observable.subscribe
有关系,但并没有什么特别的关系。
只影响订阅阶段。
资料来源:Tomek Polański (Medium)
总结
- 使用
observeOn
为 回调 设置线程“进一步向下(在它下面)”,例如 doOnNext
或 [= 中的代码块16=].
- 使用
subscribeOn
为初始化设置线程“上游(在它之上)”,例如doOnSubscribe
、Observable.just
或Observable.create
.
- 两种方法都可以调用多次,每次调用都会覆盖之前的调用。 位置很重要。
让我们通过一个例子来了解这个主题:我们想要找到字符串“user1032613”的长度。这对计算机来说不是一件容易的事,所以我们很自然地在后台线程中执行密集计算,以避免冻结应用程序。
观察
我们可以根据需要多次调用 observeOn
,它控制哪个线程在它下面回调 运行。它简单易用,效果如您所愿。
比如我们会在主UI线程显示一个进度条,然后在另一个线程做intensive/blocking操作,然后回到主UI线程更新结果:
Observable.just("user1032613")
.observeOn(mainThread) // set thread for operation 1
.doOnNext {
/* operation 1 */
print("display progress bar")
progressBar.visibility = View.VISIBLE
}
.observeOn(backThread) // set thread for operation 2 and 3
.map {
/* operation 2 */
print("calculating")
Thread.sleep(5000)
it.length
}
.doOnNext {
/* operation 3 */
print("finished calculating")
}
.observeOn(mainThread) // set thread for operation 4
.doOnNext {
/* operation 4 */
print("hide progress bar and display result")
progressBar.visibility = View.GONE
resultTextView.text = "There're $it characters!"
}
.subscribe()
在上面的例子中,/* operation 1 */
在mainThread
中是运行,因为我们在它正上方的一行中使用observeOn(mainThread)
设置它;然后我们通过再次调用 observeOn
切换到 backThread
,因此 /* operation 2 */
将在那里 运行。因为我们在链接 /* operation 3 */
之前没有更改它,所以它也会在后台线程中 运行 ,就像 /* operation 2 */
一样;最后我们再次调用 observeOn(mainThread)
,以确保 /* operation 4 */
从主线程更新 UI。
订阅
所以我们了解到 observeOn
为后续回调设置线程。我们还缺少什么?那么,Observable
本身,以及它的just()
、create()
、subscribe()
等方法,也是需要执行的代码。这就是对象沿流传递的方式。我们使用 subscribeOn
为与 Observable
本身相关的代码设置线程。
如果我们删除所有回调(由前面讨论的 observeOn
控制),我们将留下“骨架代码”,默认情况下,运行 在代码编写的任何线程上在(可能是主线程):
Observable.just("user1032613")
.observeOn(mainThread)
.doOnNext {
}
.observeOn(backThread)
.map {
}
.doOnNext {
}
.observeOn(mainThread)
.doOnNext {
}
.subscribe()
如果我们对主线程上的空框架代码 运行 不满意,我们可以使用 subscribeOn
来更改它。例如,也许第一行 Observable.just("user1032613")
并不像从我的用户名创建流那么简单 - 也许它是来自 Internet 的字符串,或者您可能正在使用 doOnSubscribe
进行其他一些密集型操作。在这种情况下,您可以调用 subscribeOn(backThread)
将一些代码放在另一个线程中。
放在哪里subscribeOn
在撰写此答案时,存在一些误解,如“只调用一次”、“位置无关紧要”和“如果调用多次,则只有第一次有效”。经过大量研究和实验,事实证明 subscribeOn
可以多次调用。
因为 Observable
使用 Builder Pattern(“一个接一个地链接方法”的奇特名称),所以 subscribeOn
以相反的顺序应用。因此,此方法为 上面的代码 设置线程,与 observeOn
.
正好相反
我们可以使用 doOnSubscribe
方法进行实验。此方法在订阅事件上触发,它 运行s 在由 subscribeOn
:
设置的线程上
Observable.just("user1032613")
.doOnSubscribe {
print("#3 running on main thread")
}
.subscribeOn(mainThread) // set thread for #3 and just()
.doOnNext {
}
.map {
}
.doOnSubscribe {
print("#2 running on back thread")
}
.doOnNext {
}
.subscribeOn(backThread) // set thread for #2 above
.doOnNext {
}
.doOnSubscribe {
print("#1 running on default thread")
}
.subscribe()
如果您从下到上阅读上面的示例,可能更容易理解逻辑,就像构建器模式执行代码的方式一样。
在此示例中,第一行 Observable.just("user1032613")
与 print("#3")
在同一线程中 运行,因为它们之间没有更多的 subscribeOn
。对于那些只关心 just()
或 create()
中的代码的人来说,这会造成“只有第一次调用很重要”的错觉。这个quickly falls apart once you start doing more.
脚注:
示例中的线程和print()
函数定义如下,为简洁起见:
val mainThread = AndroidSchedulers.mainThread()
val backThread = Schedulers.computation()
private fun print(msg: String) = Log.i("", "${Thread.currentThread().name}: $msg")
如果有人觉得 rx java 描述难以理解(比如我),这里是纯粹的 java 解释:
subscribeOn()
Observable.just("something")
.subscribeOn(Schedulers.newThread())
.subscribe(...);
相当于:
Observable observable = Observable.just("something");
new Thread(() -> observable.subscribe(...)).start();
因为 Observable
在 subscribe()
上发出值,而这里 subscribe()
进入单独的线程,这些值也与 subscribe()
在同一线程中发出。这就是它工作的原因 "upstream" (影响之前操作的线程)和 "downstream".
observeOn()
Observable.just("something")
.observeOn(Schedulers.newThread())
.subscribe(...);
相当于:
Observable observable = Observable.just("something")
.subscribe(it -> new Thread(() -> ...).start());
这里Observable
在主线程中发出值,只有监听器方法在单独的线程中执行。
这个回答不是什么新鲜事,我只是想再澄清一点。
假设我们有两个线程。
val pool1 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 1") }
val pool2 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 2") }
如答案所述,observeOn
将设置 Downstream
,而 subscribeOn
将设置 Upstream
。但是如果两者都用呢?为了检查这一点,我逐行添加了日志。
Observable.just("what if use both")
.doOnSubscribe { Log.d("Thread", "both, doOnSubscribe A " + Thread.currentThread().name) }
.doOnNext { Log.d("Thread", "both, doOnNext A " + Thread.currentThread().name) }
.map {
Log.d("Thread", "both, map A " + Thread.currentThread().name)
it + " A"
}
// observeOn
.observeOn(Schedulers.from(pool1))
.doOnSubscribe { Log.d("Thread", "both, doOnSubscribe B " + Thread.currentThread().name) }
.doOnNext { Log.d("Thread", "both, doOnNext B " + Thread.currentThread().name) }
.map {
Log.d("Thread", "both, map B " + Thread.currentThread().name)
it + " B"
}
// subscribeOn
.subscribeOn(Schedulers.from(pool2))
.doOnSubscribe { Log.d("Thread", "both, doOnSubscribe C " + Thread.currentThread().name) }
.doOnNext { Log.d("Thread", "both, doOnNext C " + Thread.currentThread().name) }
.map {
Log.d("Thread", "both, map C " + Thread.currentThread().name)
it + " C"
}
// observeOn main
.observeOn(AndroidSchedulers.mainThread())
.doOnNext { Log.d("Thread", "main " + Thread.currentThread().name) }
.subscribe(
{ result -> Log.d("Thread", "main subscribe " + Thread.currentThread().name)}
, { error -> {} }
)
结果是:
both, doOnSubscribe C main
both, doOnSubscribe A Thread 2
both, doOnSubscribe B Thread 2
both, doOnNext A Thread 2
both, map A Thread 2
both, doOnNext B Thread 1
both, map B Thread 1
both, doOnNext C Thread 1
both, map C Thread 1
main main
main subscribe main
result: what if use both A B C
如您所见,doOnSubscribe
首先被调用,从下到上。这意味着 subscribe
优先于其他运算符,因此处理第一个代码的第一个线程是 线程 2.
然后一行一行地调用了其他算子。在 observeOn
之后,线程更改为 Thread 1
。然后,就在调用 subscribe
之前,再次调用 observeOn
以将线程更改为主线程。 (不用管AndroidSchedulers,它只是一种调度器)
TL;DR;
- 第一条路径,
subscribeOn
首先调用,从下到上。
- 第二条路径,
observeOn
调用,从上到下,以及其他代码。
- RxJava2 和 RxJava3 上的行为相同
当您订阅一个 observable 时,一个流程开始工作,它一直向上到链的顶部,然后再次向下。订阅部分与向上链接相关,观察部分与向下链接相关。
一旦到达链顶,订阅阶段就基本完成了。事件开始发出,映射、过滤器等的向下链被调用。
SubscribeOn 影响 subscription 调用,例如 doOnSubscribe。
ObserveOn 影响 observation 低于其位置的调用,例如 doOnNext、map、flatmap 等
两者都会改变用于继续向上或向下流动的线程。
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.CountDownLatch;
public class SubscribeVsObserveOn {
public static void main(String[] args) throws InterruptedException {
System.out.println("Ordinal 0: " + Thread.currentThread().getName());
final CountDownLatch latch = new CountDownLatch(1);
Observable
.just("a regular string.")
.doOnSubscribe(disposable ->
System.out.println("Ordinal 2: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.doOnNext(s ->
System.out.println("Ordinal 3: " + Thread.currentThread().getName()))
.map(s -> s)
.doOnSubscribe(disposable ->
System.out.println("Ordinal 1: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.doOnNext(s ->
System.out.println("Ordinal 4: " + Thread.currentThread().getName()))
.map(s -> s)
.subscribe(s -> latch.countDown());
latch.await();
}
}
这是输出:
Ordinal 0: main
Ordinal 1: RxNewThreadScheduler-1
Ordinal 2: RxNewThreadScheduler-2
Ordinal 3: RxNewThreadScheduler-3
Ordinal 4: RxNewThreadScheduler-4
我刚刚在学习 Rx-java 和 Rxandroid2,我很困惑 SubscribeOn 和 ObserveOn 之间的主要区别是什么。
SubscribeOn 指定 Observable 将在其上运行的调度程序。 ObserveOn 指定观察者将观察此 Observable 的调度程序。
所以基本上 SubscribeOn 主要是在后台线程上订阅(执行)(你不想在等待可观察对象时阻塞 UI 线程)并且在 ObserveOn 中你想观察结果主线程...
如果您熟悉 AsyncTask,那么 SubscribeOn 类似于 doInBackground 方法,ObserveOn 类似于 onPostExecute...
如果您发现
tl;dr
Observable.just("Some string")
.map(str -> str.length())
.observeOn(Schedulers.computation())
.map(length -> 2 * length)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(---)
Observe an observable... perform the map function in an IO thread (since we are
"subscribingOn"
that thread)... now switch to a Computation Thread and performmap(length -> 2 * length)
function... and finally make sure you Observe the output on (observeOn()
) Main thread.
无论如何,
observeOn()
只是将所有操作员的线程进一步下游。人们通常有这样一种 误解 ,认为 observeOn
也是 上游 ,但事实并非如此。
下面的例子会更好地解释...
Observable.just("Some string") // UI
.map(str -> str.length()) // UI
.observeOn(Schedulers.computation()) // Changing the thread
.map(length -> 2 * length) // Computation
.subscribe(---)
subscribeOn()
仅影响 当订阅 Observable 时将要使用的线程,它将留在下游。
Observable.just("Some String") // Computation
.map(str -> str.length()) // Computation
.map(length -> 2 * length) // Computation
.subscribeOn(Schedulers.computation()) // -- changing the thread
.subscribe(number -> Log.d("", "Number " + number));// Computation
Position does not matter (
subscribeOn()
)
为什么? 因为只影响订阅时间
Methods that obey the contact with
subscribeOn
-> 基本示例:Observable.create
create
体内指定的所有工作将 运行 在 subscribeOn
中指定的线程上。
另一个例子:Observable.just
、Observable.from
或Observable.range
注意:所有这些方法都接受值,所以不要使用阻塞方法来创建这些值,因为 subscribeOn 不会影响它。
如果要使用阻塞函数,请使用
Observable.defer(() -> Obervable.just(blockingMenthod())));
Important Fact:
subscribeOn 不适用于 Subjects
Multiple
subscribeOn
:
如果流中有多个subscribeOn
实例,只有第一个有实际效果。
Subscribe &
subscribeOn
人们认为subscribeOn
与Observable.subscribe
有关系,但并没有什么特别的关系。
只影响订阅阶段。
资料来源:Tomek Polański (Medium)
总结
- 使用
observeOn
为 回调 设置线程“进一步向下(在它下面)”,例如doOnNext
或 [= 中的代码块16=]. - 使用
subscribeOn
为初始化设置线程“上游(在它之上)”,例如doOnSubscribe
、Observable.just
或Observable.create
. - 两种方法都可以调用多次,每次调用都会覆盖之前的调用。 位置很重要。
让我们通过一个例子来了解这个主题:我们想要找到字符串“user1032613”的长度。这对计算机来说不是一件容易的事,所以我们很自然地在后台线程中执行密集计算,以避免冻结应用程序。
观察
我们可以根据需要多次调用 observeOn
,它控制哪个线程在它下面回调 运行。它简单易用,效果如您所愿。
比如我们会在主UI线程显示一个进度条,然后在另一个线程做intensive/blocking操作,然后回到主UI线程更新结果:
Observable.just("user1032613")
.observeOn(mainThread) // set thread for operation 1
.doOnNext {
/* operation 1 */
print("display progress bar")
progressBar.visibility = View.VISIBLE
}
.observeOn(backThread) // set thread for operation 2 and 3
.map {
/* operation 2 */
print("calculating")
Thread.sleep(5000)
it.length
}
.doOnNext {
/* operation 3 */
print("finished calculating")
}
.observeOn(mainThread) // set thread for operation 4
.doOnNext {
/* operation 4 */
print("hide progress bar and display result")
progressBar.visibility = View.GONE
resultTextView.text = "There're $it characters!"
}
.subscribe()
在上面的例子中,/* operation 1 */
在mainThread
中是运行,因为我们在它正上方的一行中使用observeOn(mainThread)
设置它;然后我们通过再次调用 observeOn
切换到 backThread
,因此 /* operation 2 */
将在那里 运行。因为我们在链接 /* operation 3 */
之前没有更改它,所以它也会在后台线程中 运行 ,就像 /* operation 2 */
一样;最后我们再次调用 observeOn(mainThread)
,以确保 /* operation 4 */
从主线程更新 UI。
订阅
所以我们了解到 observeOn
为后续回调设置线程。我们还缺少什么?那么,Observable
本身,以及它的just()
、create()
、subscribe()
等方法,也是需要执行的代码。这就是对象沿流传递的方式。我们使用 subscribeOn
为与 Observable
本身相关的代码设置线程。
如果我们删除所有回调(由前面讨论的 observeOn
控制),我们将留下“骨架代码”,默认情况下,运行 在代码编写的任何线程上在(可能是主线程):
Observable.just("user1032613")
.observeOn(mainThread)
.doOnNext {
}
.observeOn(backThread)
.map {
}
.doOnNext {
}
.observeOn(mainThread)
.doOnNext {
}
.subscribe()
如果我们对主线程上的空框架代码 运行 不满意,我们可以使用 subscribeOn
来更改它。例如,也许第一行 Observable.just("user1032613")
并不像从我的用户名创建流那么简单 - 也许它是来自 Internet 的字符串,或者您可能正在使用 doOnSubscribe
进行其他一些密集型操作。在这种情况下,您可以调用 subscribeOn(backThread)
将一些代码放在另一个线程中。
放在哪里subscribeOn
在撰写此答案时,存在一些误解,如“只调用一次”、“位置无关紧要”和“如果调用多次,则只有第一次有效”。经过大量研究和实验,事实证明 subscribeOn
可以多次调用。
因为 Observable
使用 Builder Pattern(“一个接一个地链接方法”的奇特名称),所以 subscribeOn
以相反的顺序应用。因此,此方法为 上面的代码 设置线程,与 observeOn
.
我们可以使用 doOnSubscribe
方法进行实验。此方法在订阅事件上触发,它 运行s 在由 subscribeOn
:
Observable.just("user1032613")
.doOnSubscribe {
print("#3 running on main thread")
}
.subscribeOn(mainThread) // set thread for #3 and just()
.doOnNext {
}
.map {
}
.doOnSubscribe {
print("#2 running on back thread")
}
.doOnNext {
}
.subscribeOn(backThread) // set thread for #2 above
.doOnNext {
}
.doOnSubscribe {
print("#1 running on default thread")
}
.subscribe()
如果您从下到上阅读上面的示例,可能更容易理解逻辑,就像构建器模式执行代码的方式一样。
在此示例中,第一行 Observable.just("user1032613")
与 print("#3")
在同一线程中 运行,因为它们之间没有更多的 subscribeOn
。对于那些只关心 just()
或 create()
中的代码的人来说,这会造成“只有第一次调用很重要”的错觉。这个quickly falls apart once you start doing more.
脚注:
示例中的线程和print()
函数定义如下,为简洁起见:
val mainThread = AndroidSchedulers.mainThread()
val backThread = Schedulers.computation()
private fun print(msg: String) = Log.i("", "${Thread.currentThread().name}: $msg")
如果有人觉得 rx java 描述难以理解(比如我),这里是纯粹的 java 解释:
subscribeOn()
Observable.just("something")
.subscribeOn(Schedulers.newThread())
.subscribe(...);
相当于:
Observable observable = Observable.just("something");
new Thread(() -> observable.subscribe(...)).start();
因为 Observable
在 subscribe()
上发出值,而这里 subscribe()
进入单独的线程,这些值也与 subscribe()
在同一线程中发出。这就是它工作的原因 "upstream" (影响之前操作的线程)和 "downstream".
observeOn()
Observable.just("something")
.observeOn(Schedulers.newThread())
.subscribe(...);
相当于:
Observable observable = Observable.just("something")
.subscribe(it -> new Thread(() -> ...).start());
这里Observable
在主线程中发出值,只有监听器方法在单独的线程中执行。
这个回答不是什么新鲜事,我只是想再澄清一点。
假设我们有两个线程。
val pool1 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 1") } val pool2 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 2") }
如答案所述,
observeOn
将设置Downstream
,而subscribeOn
将设置Upstream
。但是如果两者都用呢?为了检查这一点,我逐行添加了日志。Observable.just("what if use both") .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe A " + Thread.currentThread().name) } .doOnNext { Log.d("Thread", "both, doOnNext A " + Thread.currentThread().name) } .map { Log.d("Thread", "both, map A " + Thread.currentThread().name) it + " A" } // observeOn .observeOn(Schedulers.from(pool1)) .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe B " + Thread.currentThread().name) } .doOnNext { Log.d("Thread", "both, doOnNext B " + Thread.currentThread().name) } .map { Log.d("Thread", "both, map B " + Thread.currentThread().name) it + " B" } // subscribeOn .subscribeOn(Schedulers.from(pool2)) .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe C " + Thread.currentThread().name) } .doOnNext { Log.d("Thread", "both, doOnNext C " + Thread.currentThread().name) } .map { Log.d("Thread", "both, map C " + Thread.currentThread().name) it + " C" } // observeOn main .observeOn(AndroidSchedulers.mainThread()) .doOnNext { Log.d("Thread", "main " + Thread.currentThread().name) } .subscribe( { result -> Log.d("Thread", "main subscribe " + Thread.currentThread().name)} , { error -> {} } )
结果是:
both, doOnSubscribe C main
both, doOnSubscribe A Thread 2
both, doOnSubscribe B Thread 2
both, doOnNext A Thread 2
both, map A Thread 2
both, doOnNext B Thread 1
both, map B Thread 1
both, doOnNext C Thread 1
both, map C Thread 1
main main
main subscribe main
result: what if use both A B C
如您所见,doOnSubscribe
首先被调用,从下到上。这意味着 subscribe
优先于其他运算符,因此处理第一个代码的第一个线程是 线程 2.
然后一行一行地调用了其他算子。在 observeOn
之后,线程更改为 Thread 1
。然后,就在调用 subscribe
之前,再次调用 observeOn
以将线程更改为主线程。 (不用管AndroidSchedulers,它只是一种调度器)
TL;DR;
- 第一条路径,
subscribeOn
首先调用,从下到上。 - 第二条路径,
observeOn
调用,从上到下,以及其他代码。 - RxJava2 和 RxJava3 上的行为相同
当您订阅一个 observable 时,一个流程开始工作,它一直向上到链的顶部,然后再次向下。订阅部分与向上链接相关,观察部分与向下链接相关。
一旦到达链顶,订阅阶段就基本完成了。事件开始发出,映射、过滤器等的向下链被调用。
SubscribeOn 影响 subscription 调用,例如 doOnSubscribe。
ObserveOn 影响 observation 低于其位置的调用,例如 doOnNext、map、flatmap 等
两者都会改变用于继续向上或向下流动的线程。
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.CountDownLatch;
public class SubscribeVsObserveOn {
public static void main(String[] args) throws InterruptedException {
System.out.println("Ordinal 0: " + Thread.currentThread().getName());
final CountDownLatch latch = new CountDownLatch(1);
Observable
.just("a regular string.")
.doOnSubscribe(disposable ->
System.out.println("Ordinal 2: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.doOnNext(s ->
System.out.println("Ordinal 3: " + Thread.currentThread().getName()))
.map(s -> s)
.doOnSubscribe(disposable ->
System.out.println("Ordinal 1: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.doOnNext(s ->
System.out.println("Ordinal 4: " + Thread.currentThread().getName()))
.map(s -> s)
.subscribe(s -> latch.countDown());
latch.await();
}
}
这是输出:
Ordinal 0: main
Ordinal 1: RxNewThreadScheduler-1
Ordinal 2: RxNewThreadScheduler-2
Ordinal 3: RxNewThreadScheduler-3
Ordinal 4: RxNewThreadScheduler-4