RxJava 在 calling/subscribing 线程上观察
RxJava Observing on calling/subscribing thread
我在理解 subscribeOn/observeOn 如何在 RxJava 中工作时遇到一些问题。我已经创建了带有可观察到的简单应用程序,它发出太阳系行星名称,进行一些映射和过滤并打印结果。
据我所知,调度后台线程的工作是通过 subscribeOn
运算符完成的(而且它似乎工作正常)。
在后台线程上观察也可以与 observeOn
运算符一起正常工作。
但是我很难理解如何观察调用线程(无论是主线程还是其他线程)。使用 AndroidSchedulers.mainThread()
运算符可以在 Android 上轻松完成,但我不知道如何在纯 java.
中实现
这是我的代码:
public class Main {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
System.out.println("Main thread: " + getCurrentThreadInfo());
Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
.map(in -> {
System.out.println("map on: " + getCurrentThreadInfo());
return in.toUpperCase();
})
.filter(in -> {
System.out.println("filter on: " + getCurrentThreadInfo());
return in.contains("A");
})
.subscribeOn(Schedulers.from(executor));
for (int i = 0; i < 5; i++) {
Thread thread = new Thread("Thread-" + i) {
@Override
public void run() {
stringObservable
.buffer(5)
.subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
}
};
thread.start();
}
}
private static String getCurrentThreadInfo() {
return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
}
}
Observable in created 和 work 从执行者的三个线程之一订阅。这按预期工作。但是如何在for循环中观察那些动态创建的线程的结果呢?有没有办法从当前线程创建调度程序?
此外,我发现在 运行 这段代码之后,它永远不会终止,我不知道为什么? :(
为了回答你的问题,让我从头开始,这样可以让其他人理解你已经知道的东西。
调度程序
调度程序与 Java 的执行程序扮演相同的角色。简而言之 - 他们决定执行哪些线程操作。
通常一个 Observable 和操作符在当前线程中执行。有时您可以将 Scheduler 作为参数传递给 Observable 或运算符(例如 Observable.timer()).
另外RxJava提供了2个操作符来指定调度器:
- subscribeOn - 指定 Observable 运行的调度器
- observeOn - 指定观察者将观察此 Observable 的调度器
为了快速理解它们,我使用了示例代码:
在所有示例中,我将使用 helper createObservable,它发出 Observable 运行的线程的名称:
public static Observable<String> createObservable(){
return Observable.create((Subscriber<? super String> subscriber) -> {
subscriber.onNext(Thread.currentThread().getName());
subscriber.onCompleted();
}
);
}
没有调度程序:
createObservable().subscribe(message -> {
System.out.println("Case 1 Observable thread " + message);
System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 1 Observable thread main
//Case 1 Observer thread main
订阅:
createObservable()
.subscribeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 2 Observable thread " + message);
System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 2 Observable thread RxNewThreadScheduler-1
//Case 2 Observer thread RxNewThreadScheduler-1
订阅和观察:
reateObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 3 Observable thread " + message);
System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 3 Observable thread RxNewThreadScheduler-2
//Case 3 Observer thread RxNewThreadScheduler-1
观察:
createObservable()
.observeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 4 Observable thread " + message);
System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 4 Observable thread main
//Case 4 Observer thread RxNewThreadScheduler-1
答案:
AndroidSchedulers.mainThread() returns 将工作委托给与主线程关联的 MessageQueue 的调度器。
为此,它使用 android.os.Looper.getMainLooper() 和 android.os.Handler.
换句话说,如果你想指定特定的线程,你必须提供在线程上调度和执行任务的方法。
在它下面可以使用任何类型的 MQ 来存储任务和循环 Quee 并执行任务的逻辑。
在java中,我们有专为此类任务指定的Executor。 RxJava 可以轻松地从这样的 Executor 创建 Scheduler。
下面的示例展示了如何在主线程上进行观察(不是特别有用,但显示了所有必需的部分)。
public class RunCurrentThread implements Executor {
private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
public static void main(String[] args) throws InterruptedException {
RunCurrentThread sample = new RunCurrentThread();
sample.observerOnMain();
sample.runLoop();
}
private void observerOnMain() {
createObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.from(this))
.subscribe(message -> {
System.out.println("Observable thread " + message);
System.out.println("Observer thread " + Thread.currentThread().getName());
});
;
}
public Observable<String> createObservable() {
return Observable.create((Subscriber<? super String> subscriber) -> {
subscriber.onNext(Thread.currentThread().getName());
subscriber.onCompleted();
}
);
}
private void runLoop() throws InterruptedException {
while(!Thread.interrupted()){
tasks.take().run();
}
}
@Override
public void execute(Runnable command) {
tasks.add(command);
}
}
最后一个问题,为什么你的代码没有终止:
ThreadPoolExecutor 默认使用非守护线程,因此您的程序在它们存在之前不会结束。
您应该使用 shutdown 方法关闭线程。
这里是为 RxJava 2 更新的简化示例。它与 Marek 的回答具有相同的概念:将可运行对象添加到调用者线程正在使用的 BlockingQueue 的执行器。
public class ThreadTest {
@Test
public void test() throws InterruptedException {
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
System.out.println("Caller thread: " + Thread.currentThread().getName());
Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Observable thread: " + Thread.currentThread().getName());
return 1;
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(new Executor() {
@Override
public void execute(@NonNull Runnable runnable) {
tasks.add(runnable);
}
}))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
System.out.println("Observer thread: " + Thread.currentThread().getName());
}
});
tasks.take().run();
}
}
// Output:
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main
我在理解 subscribeOn/observeOn 如何在 RxJava 中工作时遇到一些问题。我已经创建了带有可观察到的简单应用程序,它发出太阳系行星名称,进行一些映射和过滤并打印结果。
据我所知,调度后台线程的工作是通过 subscribeOn
运算符完成的(而且它似乎工作正常)。
在后台线程上观察也可以与 observeOn
运算符一起正常工作。
但是我很难理解如何观察调用线程(无论是主线程还是其他线程)。使用 AndroidSchedulers.mainThread()
运算符可以在 Android 上轻松完成,但我不知道如何在纯 java.
这是我的代码:
public class Main {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
System.out.println("Main thread: " + getCurrentThreadInfo());
Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
.map(in -> {
System.out.println("map on: " + getCurrentThreadInfo());
return in.toUpperCase();
})
.filter(in -> {
System.out.println("filter on: " + getCurrentThreadInfo());
return in.contains("A");
})
.subscribeOn(Schedulers.from(executor));
for (int i = 0; i < 5; i++) {
Thread thread = new Thread("Thread-" + i) {
@Override
public void run() {
stringObservable
.buffer(5)
.subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
}
};
thread.start();
}
}
private static String getCurrentThreadInfo() {
return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
}
}
Observable in created 和 work 从执行者的三个线程之一订阅。这按预期工作。但是如何在for循环中观察那些动态创建的线程的结果呢?有没有办法从当前线程创建调度程序?
此外,我发现在 运行 这段代码之后,它永远不会终止,我不知道为什么? :(
为了回答你的问题,让我从头开始,这样可以让其他人理解你已经知道的东西。
调度程序
调度程序与 Java 的执行程序扮演相同的角色。简而言之 - 他们决定执行哪些线程操作。
通常一个 Observable 和操作符在当前线程中执行。有时您可以将 Scheduler 作为参数传递给 Observable 或运算符(例如 Observable.timer()).
另外RxJava提供了2个操作符来指定调度器:
- subscribeOn - 指定 Observable 运行的调度器
- observeOn - 指定观察者将观察此 Observable 的调度器
为了快速理解它们,我使用了示例代码:
在所有示例中,我将使用 helper createObservable,它发出 Observable 运行的线程的名称:
public static Observable<String> createObservable(){
return Observable.create((Subscriber<? super String> subscriber) -> {
subscriber.onNext(Thread.currentThread().getName());
subscriber.onCompleted();
}
);
}
没有调度程序:
createObservable().subscribe(message -> {
System.out.println("Case 1 Observable thread " + message);
System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 1 Observable thread main
//Case 1 Observer thread main
订阅:
createObservable()
.subscribeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 2 Observable thread " + message);
System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 2 Observable thread RxNewThreadScheduler-1
//Case 2 Observer thread RxNewThreadScheduler-1
订阅和观察:
reateObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 3 Observable thread " + message);
System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 3 Observable thread RxNewThreadScheduler-2
//Case 3 Observer thread RxNewThreadScheduler-1
观察:
createObservable()
.observeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 4 Observable thread " + message);
System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 4 Observable thread main
//Case 4 Observer thread RxNewThreadScheduler-1
答案:
AndroidSchedulers.mainThread() returns 将工作委托给与主线程关联的 MessageQueue 的调度器。
为此,它使用 android.os.Looper.getMainLooper() 和 android.os.Handler.
换句话说,如果你想指定特定的线程,你必须提供在线程上调度和执行任务的方法。
在它下面可以使用任何类型的 MQ 来存储任务和循环 Quee 并执行任务的逻辑。
在java中,我们有专为此类任务指定的Executor。 RxJava 可以轻松地从这样的 Executor 创建 Scheduler。
下面的示例展示了如何在主线程上进行观察(不是特别有用,但显示了所有必需的部分)。
public class RunCurrentThread implements Executor {
private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
public static void main(String[] args) throws InterruptedException {
RunCurrentThread sample = new RunCurrentThread();
sample.observerOnMain();
sample.runLoop();
}
private void observerOnMain() {
createObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.from(this))
.subscribe(message -> {
System.out.println("Observable thread " + message);
System.out.println("Observer thread " + Thread.currentThread().getName());
});
;
}
public Observable<String> createObservable() {
return Observable.create((Subscriber<? super String> subscriber) -> {
subscriber.onNext(Thread.currentThread().getName());
subscriber.onCompleted();
}
);
}
private void runLoop() throws InterruptedException {
while(!Thread.interrupted()){
tasks.take().run();
}
}
@Override
public void execute(Runnable command) {
tasks.add(command);
}
}
最后一个问题,为什么你的代码没有终止:
ThreadPoolExecutor 默认使用非守护线程,因此您的程序在它们存在之前不会结束。 您应该使用 shutdown 方法关闭线程。
这里是为 RxJava 2 更新的简化示例。它与 Marek 的回答具有相同的概念:将可运行对象添加到调用者线程正在使用的 BlockingQueue 的执行器。
public class ThreadTest {
@Test
public void test() throws InterruptedException {
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
System.out.println("Caller thread: " + Thread.currentThread().getName());
Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Observable thread: " + Thread.currentThread().getName());
return 1;
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(new Executor() {
@Override
public void execute(@NonNull Runnable runnable) {
tasks.add(runnable);
}
}))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
System.out.println("Observer thread: " + Thread.currentThread().getName());
}
});
tasks.take().run();
}
}
// Output:
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main