rxjava2 - 在线程池上执行任务的简单示例,在单个线程上订阅

rxjava2 - simple example of executing tasks on a thread pool, subscribing on a single thread

我正在尝试以下任务来了解 RxJava:

所以我在 Kotlin 中尝试了一下:

val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
    .observeOn(Schedulers.from(ex))
    .map { Thread.currentThread().name }
    .subscribe { println(it + " " + Thread.currentThread().name }

我希望它能打印出来

pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....

然而它打印:

pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1

任何人都可以纠正我对它如何工作的误解吗?为什么它不使用线程池的所有线程?如何让我的订阅者在主线程上进入 运行 或阻塞直到完成?

Rx 不是并行执行服务,为此使用 Java 的流 api。 Rx 事件是同步的,随后会流过流。在构建流时,observeOn 将请求一个线程一次,并在该线程上一个一个地处理发射。

您还希望 subscribe 在主线程上执行。 observeOn 切换线程,所有下游事件都发生在该线程上。如果要切换到主线程,则必须在 subscribe 之前插入另一个 observeOn

要使您的 map 块中的代码并行工作,您应该使用自己的调度程序将其包装为可观察的:

val ex = Executors.newFixedThreadPool(10)
    val scheduler = Schedulers.from(ex)
    Observable.fromIterable((1..100).toList())
            .flatMap {
                Observable
                        .fromCallable { Thread.currentThread().name }
                        .subscribeOn(scheduler)
            }
            .subscribe { println(it + " " + Thread.currentThread().name) }

在这种情况下,您将看到结果:

pool-1-thread-1 pool-1-thread-1
pool-1-thread-2 pool-1-thread-1
pool-1-thread-3 pool-1-thread-1
pool-1-thread-4 pool-1-thread-1
...

您可以查看解释此行为的文章 RxJava - Achieving Parallelization

另外,引入了 RxJava 2.0.5 ParallelFlowable API