rxjava2 - 在线程池上执行任务的简单示例,在单个线程上订阅
rxjava2 - simple example of executing tasks on a thread pool, subscribing on a single thread
我正在尝试以下任务来了解 RxJava:
- 给出 URLs
的列表
- 在线程池
上为每个 URL 做一个 HTTP 请求
- 对于每个结果,将一些数据插入 SQLite 数据库(这里没有多线程)
- 阻止该方法直到它完成
所以我在 Kotlin 中尝试了一下:
val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
.observeOn(Schedulers.from(ex))
.map { Thread.currentThread().name }
.subscribe { println(it + " " + Thread.currentThread().name }
我希望它能打印出来
pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....
然而它打印:
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
任何人都可以纠正我对它如何工作的误解吗?为什么它不使用线程池的所有线程?如何让我的订阅者在主线程上进入 运行 或阻塞直到完成?
Rx 不是并行执行服务,为此使用 Java 的流 api。 Rx 事件是同步的,随后会流过流。在构建流时,observeOn 将请求一个线程一次,并在该线程上一个一个地处理发射。
您还希望 subscribe
在主线程上执行。 observeOn
切换线程,所有下游事件都发生在该线程上。如果要切换到主线程,则必须在 subscribe
之前插入另一个 observeOn
。
要使您的 map
块中的代码并行工作,您应该使用自己的调度程序将其包装为可观察的:
val ex = Executors.newFixedThreadPool(10)
val scheduler = Schedulers.from(ex)
Observable.fromIterable((1..100).toList())
.flatMap {
Observable
.fromCallable { Thread.currentThread().name }
.subscribeOn(scheduler)
}
.subscribe { println(it + " " + Thread.currentThread().name) }
在这种情况下,您将看到结果:
pool-1-thread-1 pool-1-thread-1
pool-1-thread-2 pool-1-thread-1
pool-1-thread-3 pool-1-thread-1
pool-1-thread-4 pool-1-thread-1
...
您可以查看解释此行为的文章 RxJava - Achieving Parallelization。
另外,引入了 RxJava 2.0.5 ParallelFlowable API
我正在尝试以下任务来了解 RxJava:
- 给出 URLs 的列表
- 在线程池 上为每个 URL 做一个 HTTP 请求
- 对于每个结果,将一些数据插入 SQLite 数据库(这里没有多线程)
- 阻止该方法直到它完成
所以我在 Kotlin 中尝试了一下:
val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
.observeOn(Schedulers.from(ex))
.map { Thread.currentThread().name }
.subscribe { println(it + " " + Thread.currentThread().name }
我希望它能打印出来
pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....
然而它打印:
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
任何人都可以纠正我对它如何工作的误解吗?为什么它不使用线程池的所有线程?如何让我的订阅者在主线程上进入 运行 或阻塞直到完成?
Rx 不是并行执行服务,为此使用 Java 的流 api。 Rx 事件是同步的,随后会流过流。在构建流时,observeOn 将请求一个线程一次,并在该线程上一个一个地处理发射。
您还希望 subscribe
在主线程上执行。 observeOn
切换线程,所有下游事件都发生在该线程上。如果要切换到主线程,则必须在 subscribe
之前插入另一个 observeOn
。
要使您的 map
块中的代码并行工作,您应该使用自己的调度程序将其包装为可观察的:
val ex = Executors.newFixedThreadPool(10)
val scheduler = Schedulers.from(ex)
Observable.fromIterable((1..100).toList())
.flatMap {
Observable
.fromCallable { Thread.currentThread().name }
.subscribeOn(scheduler)
}
.subscribe { println(it + " " + Thread.currentThread().name) }
在这种情况下,您将看到结果:
pool-1-thread-1 pool-1-thread-1
pool-1-thread-2 pool-1-thread-1
pool-1-thread-3 pool-1-thread-1
pool-1-thread-4 pool-1-thread-1
...
您可以查看解释此行为的文章 RxJava - Achieving Parallelization。
另外,引入了 RxJava 2.0.5 ParallelFlowable API