RxJava .subscribeOn(Schedulers.newThread()) 问题
RxJava .subscribeOn(Schedulers.newThread()) questions
我很简单 JDK 8. 我有这个简单的 RxJava 示例:
Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));
它逐行打印出单词,与有关线程的信息交织在一起,正如预期的那样,对于所有下一次调用都是 'main'。
但是,当我取消对 subscribeOn(Schedulers.newThread())
调用的注释时,根本不会打印任何内容。为什么它不起作用?我原以为它会为每个 onNext()
调用启动一个新线程,并且 doOnNext()
会打印该线程的名称。现在,我什么也看不到,其他调度程序也是如此。
当我在 main 的末尾添加对 Thread.sleep(10000L)
的调用时,我可以看到输出,这表明 RxJava 使用的线程都是守护进程。是这样吗?这可以以某种方式改变,但使用自定义 ThreadFactory 或类似概念,而不必实现自定义调度程序吗?
通过上述更改,线程名称始终为 RxNewThreadScheduler-1
,而 newThread 的文档显示为 "Scheduler that creates a new {@link Thread}
for each unit of work"。不是应该为所有排放创建一个新线程吗?
正如 Vladimir 提到的,RxJava 标准调度程序 运行 在守护线程上工作,在您的示例中由于主线程退出而终止。我想强调的是,他们不会在新线程上安排每个值,但他们会在新创建的线程上为每个单独的订阅者安排值流。第二次订阅会给你 "RxNewThreadScheduler-2".
您实际上不需要更改默认调度程序,只需使用 Schedulers.from() 包装您自己的基于执行器的调度程序,并在需要时将其作为参数提供:
ThreadPoolExecutor exec = new ThreadPoolExecutor(
0, 64, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
exec.allowCoreThreadTimeOut(true);
Scheduler s = Schedulers.from(exec);
Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
Thread.currentThread().getName()))
.subscribeOn(s)
.subscribe(word -> System.out.println(word));
我有一系列关于 RxJava 调度程序的 blog posts 应该可以帮助您实现 "more permanent" 变体。
与新手的看法相反,反应流不是天生并发的,而是天生异步的。它们本质上也是顺序的,并且必须在流中配置并发。简而言之,反应流在它们的末端自然是顺序的,但在它们的核心可以是并发的。
秘诀是在流中使用 flatMap() 运算符。此运算符从源流中获取 Observable 输入,并在内部将其作为 Observable> 流重新发出它也订阅了所有实例。只要 flatMap() 内部流在多线程上下文中执行,它就会同时执行应用您的逻辑的提供的 Function,最后,重新在原始流上发出结果,因为它是自己的排放物。
这听起来很复杂(乍一看有点复杂),但是带有解释的简单示例有助于理解这个概念。
从类似问题 and articles on RxJava2 Schedulers and Concurrency 中找到更多详细信息,其中包含代码示例以及有关如何顺序和并发使用调度程序的详细说明。
希望这对您有所帮助,
软杰克
public class MainClass {
public static void main(String[] args) {
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10, Executors.defaultThreadFactory()));
Observable.interval(1,TimeUnit.SECONDS)
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
Thread.currentThread().getName()))
.subscribeOn(scheduler)
.observeOn(Schedulers.io())
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
Thread.currentThread().getName()))
.subscribe();
}
}
我很简单 JDK 8. 我有这个简单的 RxJava 示例:
Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));
它逐行打印出单词,与有关线程的信息交织在一起,正如预期的那样,对于所有下一次调用都是 'main'。
但是,当我取消对 subscribeOn(Schedulers.newThread())
调用的注释时,根本不会打印任何内容。为什么它不起作用?我原以为它会为每个 onNext()
调用启动一个新线程,并且 doOnNext()
会打印该线程的名称。现在,我什么也看不到,其他调度程序也是如此。
当我在 main 的末尾添加对 Thread.sleep(10000L)
的调用时,我可以看到输出,这表明 RxJava 使用的线程都是守护进程。是这样吗?这可以以某种方式改变,但使用自定义 ThreadFactory 或类似概念,而不必实现自定义调度程序吗?
通过上述更改,线程名称始终为 RxNewThreadScheduler-1
,而 newThread 的文档显示为 "Scheduler that creates a new {@link Thread}
for each unit of work"。不是应该为所有排放创建一个新线程吗?
正如 Vladimir 提到的,RxJava 标准调度程序 运行 在守护线程上工作,在您的示例中由于主线程退出而终止。我想强调的是,他们不会在新线程上安排每个值,但他们会在新创建的线程上为每个单独的订阅者安排值流。第二次订阅会给你 "RxNewThreadScheduler-2".
您实际上不需要更改默认调度程序,只需使用 Schedulers.from() 包装您自己的基于执行器的调度程序,并在需要时将其作为参数提供:
ThreadPoolExecutor exec = new ThreadPoolExecutor(
0, 64, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
exec.allowCoreThreadTimeOut(true);
Scheduler s = Schedulers.from(exec);
Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
Thread.currentThread().getName()))
.subscribeOn(s)
.subscribe(word -> System.out.println(word));
我有一系列关于 RxJava 调度程序的 blog posts 应该可以帮助您实现 "more permanent" 变体。
与新手的看法相反,反应流不是天生并发的,而是天生异步的。它们本质上也是顺序的,并且必须在流中配置并发。简而言之,反应流在它们的末端自然是顺序的,但在它们的核心可以是并发的。
秘诀是在流中使用 flatMap() 运算符。此运算符从源流中获取 Observable
这听起来很复杂(乍一看有点复杂),但是带有解释的简单示例有助于理解这个概念。
从类似问题
希望这对您有所帮助,
软杰克
public class MainClass {
public static void main(String[] args) {
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10, Executors.defaultThreadFactory()));
Observable.interval(1,TimeUnit.SECONDS)
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
Thread.currentThread().getName()))
.subscribeOn(scheduler)
.observeOn(Schedulers.io())
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
Thread.currentThread().getName()))
.subscribe();
}
}