主线程中的 SubscribeOn 和 observeOn
SubscribeOn and observeOn in the main thread
我正在尝试将 subscribeOn 和 obsereOn 与 Executor 一起使用,以便在异步任务完成后让我返回主线程。
我最终得到了这段代码,但它不起作用
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
// while (tasks.size() != 0) {
// tasks.take().run();
// }
System.out.println("done");
}
private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
private void processValue(int value) throws InterruptedException {
Observable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(Schedulers.from(command -> tasks.add(command)))
.subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
tasks.take().run();
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
知道如何实现我想要的吗?
当我运行我只打印
Execution in RxIoScheduler-2
Execution in RxIoScheduler-3
Execution in RxIoScheduler-4
Execution in RxIoScheduler-5
Execution in RxIoScheduler-6
done
此致
你的问题不会出现在RxJava2中。推荐使用RxJava2.
我对比了RxJava-1.2.7和RxJava-2.0.7,找到了根本原因。现在我正在寻找解决方案。
在 RxJava-1.2.7.You 中可以看到 ObservableObserveOn#145
并在调用 request
时找到 schedule
任务。这意味着当你订阅它时它会调用 Executor.execute
。因此,您的任务队列会立即接受 Runnable
。然后你拿 运行 Runnable
(这是实际的 ExecutorSchedulerWorker
)但是上游的 onNext
还没有被调用(因为你睡了 2000 毫秒)。它将在 ObserveOnSubscriber#213
return null
。当上游调用onNext(Integer)
时,任务永远不会是运行.
你的方法的问题是你不知道在给定的时间应该执行多少任务,也不会在你解除主线程阻塞后等待应该发生的任务时死锁。
据我所知,1.x 的任何扩展都不支持返回 Java 主线程。对于 2.x,扩展项目中的 BlockingScheduler 允许您这样做:
public static void main(String[] args) {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> {
Flowable.range(1,10)
.subscribeOn(Schedulers.io())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
});
System.out.println("BlockingScheduler finished");
}
注意对 scheduler.shutdown()
的调用,最终必须调用它来释放主线程,否则您的程序可能永远不会终止。
我只是根据 akanord 的建议更新了我的代码,但这种方法似乎将一个任务阻塞到另一个任务,并且最终 运行ning 顺序。
使用代码:
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
System.out.println("done");
}
private void processValue(int value) throws InterruptedException {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> Flowable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread())));
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
和输出
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
2 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
3 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
4 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
5 on Thread[main,5,main]
done
我要实现的就是这个输出
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
2 on Thread[main,5,main]
3 on Thread[main,5,main]
4 on Thread[main,5,main]
5 on Thread[main,5,main]
done
所以每次主线程 运行 管道 运行 另一个线程中的 onNext 然后它 return 从方法直到另一个线程完成并使其成为主线程回到管道。
我正在尝试将 subscribeOn 和 obsereOn 与 Executor 一起使用,以便在异步任务完成后让我返回主线程。 我最终得到了这段代码,但它不起作用
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
// while (tasks.size() != 0) {
// tasks.take().run();
// }
System.out.println("done");
}
private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
private void processValue(int value) throws InterruptedException {
Observable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(Schedulers.from(command -> tasks.add(command)))
.subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
tasks.take().run();
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
知道如何实现我想要的吗?
当我运行我只打印
Execution in RxIoScheduler-2
Execution in RxIoScheduler-3
Execution in RxIoScheduler-4
Execution in RxIoScheduler-5
Execution in RxIoScheduler-6
done
此致
你的问题不会出现在RxJava2中。推荐使用RxJava2.
我对比了RxJava-1.2.7和RxJava-2.0.7,找到了根本原因。现在我正在寻找解决方案。
在 RxJava-1.2.7.You 中可以看到 ObservableObserveOn#145
并在调用 request
时找到 schedule
任务。这意味着当你订阅它时它会调用 Executor.execute
。因此,您的任务队列会立即接受 Runnable
。然后你拿 运行 Runnable
(这是实际的 ExecutorSchedulerWorker
)但是上游的 onNext
还没有被调用(因为你睡了 2000 毫秒)。它将在 ObserveOnSubscriber#213
return null
。当上游调用onNext(Integer)
时,任务永远不会是运行.
你的方法的问题是你不知道在给定的时间应该执行多少任务,也不会在你解除主线程阻塞后等待应该发生的任务时死锁。
据我所知,1.x 的任何扩展都不支持返回 Java 主线程。对于 2.x,扩展项目中的 BlockingScheduler 允许您这样做:
public static void main(String[] args) {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> {
Flowable.range(1,10)
.subscribeOn(Schedulers.io())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
});
System.out.println("BlockingScheduler finished");
}
注意对 scheduler.shutdown()
的调用,最终必须调用它来释放主线程,否则您的程序可能永远不会终止。
我只是根据 akanord 的建议更新了我的代码,但这种方法似乎将一个任务阻塞到另一个任务,并且最终 运行ning 顺序。
使用代码:
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
System.out.println("done");
}
private void processValue(int value) throws InterruptedException {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> Flowable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread())));
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
和输出
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
2 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
3 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
4 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
5 on Thread[main,5,main]
done
我要实现的就是这个输出
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
2 on Thread[main,5,main]
3 on Thread[main,5,main]
4 on Thread[main,5,main]
5 on Thread[main,5,main]
done
所以每次主线程 运行 管道 运行 另一个线程中的 onNext 然后它 return 从方法直到另一个线程完成并使其成为主线程回到管道。