为什么 RxJava 在异步处理时只使用大约 10 个线程?
How come RxJava uses only ~10 threads when processing asynchronously?
考虑以下代码,我试图让 Observables 异步地达到 运行。
try {
DateTime now = DateTime.now();
Observable
.from(map.entrySet()).subscribeOn(Schedulers.from(new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 10)))
.flatMap(Async.toAsync((Map.Entry<String, Info> entry) -> {
// processing work, makes multiple http requests for ref data
}))
.doOnCompleted(() -> System.out.println("completed yo...."))
.doOnError(Throwable::printStackTrace)
.toList()
.timeout(1, TimeUnit.MINUTES)
.toBlocking()
.single()
;
logger.info(now.toString());
logger.info(DateTime.now().toString());
saveToFile(gson.toJson(setForRx));
} catch (Exception e) {
e.printStackTrace();
}
输出显示它使用相同的 ~10 个线程进行处理,我该如何增加它?
示例输出:
INFO 2015-06-29 15:11:20,524 [rxjava.ConcurrentRxJava] RxComputationThreadPool-3
INFO 2015-06-29 15:11:20,526 [rxjava.ConcurrentRxJava] RxComputationThreadPool-6
INFO 2015-06-29 15:11:20,542 [rxjava.ConcurrentRxJava] RxComputationThreadPool-4
INFO 2015-06-29 15:11:20,546 [rxjava.ConcurrentRxJava] RxComputationThreadPool-7
INFO 2015-06-29 15:11:20,571 [rxjava.ConcurrentRxJava] RxComputationThreadPool-2
INFO 2015-06-29 15:11:20,694 [rxjava.ConcurrentRxJava] RxComputationThreadPool-1
INFO 2015-06-29 15:11:20,920 [rxjava.ConcurrentRxJava] RxComputationThreadPool-8
INFO 2015-06-29 15:11:21,035 [rxjava.ConcurrentRxJava] RxComputationThreadPool-7
INFO 2015-06-29 15:11:21,039 [rxjava.ConcurrentRxJava] RxComputationThreadPool-4
INFO 2015-06-29 15:11:21,055 [rxjava.ConcurrentRxJava] RxComputationThreadPool-5
INFO 2015-06-29 15:11:21,081 [rxjava.ConcurrentRxJava] RxComputationThreadPool-3
INFO 2015-06-29 15:11:21,094 [rxjava.ConcurrentRxJava] RxComputationThreadPool-6
INFO 2015-06-29 15:11:21,118 [rxjava.ConcurrentRxJava] RxComputationThreadPool-2
在我的执行程序版本中,使用 Runtime.getRuntime().availableProcessors() * 10
,我得到大小为 80 的池。 RxJava 可以吗?
默认情况下,toAsync()
在具有固定线程数的 computation()
调度程序上运行。有一个重载需要一个调度程序,因此您应该将 Schedulers.from(...)
重构为局部变量并将该变量传递给 toAsync()
.
考虑以下代码,我试图让 Observables 异步地达到 运行。
try {
DateTime now = DateTime.now();
Observable
.from(map.entrySet()).subscribeOn(Schedulers.from(new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 10)))
.flatMap(Async.toAsync((Map.Entry<String, Info> entry) -> {
// processing work, makes multiple http requests for ref data
}))
.doOnCompleted(() -> System.out.println("completed yo...."))
.doOnError(Throwable::printStackTrace)
.toList()
.timeout(1, TimeUnit.MINUTES)
.toBlocking()
.single()
;
logger.info(now.toString());
logger.info(DateTime.now().toString());
saveToFile(gson.toJson(setForRx));
} catch (Exception e) {
e.printStackTrace();
}
输出显示它使用相同的 ~10 个线程进行处理,我该如何增加它?
示例输出:
INFO 2015-06-29 15:11:20,524 [rxjava.ConcurrentRxJava] RxComputationThreadPool-3
INFO 2015-06-29 15:11:20,526 [rxjava.ConcurrentRxJava] RxComputationThreadPool-6
INFO 2015-06-29 15:11:20,542 [rxjava.ConcurrentRxJava] RxComputationThreadPool-4
INFO 2015-06-29 15:11:20,546 [rxjava.ConcurrentRxJava] RxComputationThreadPool-7
INFO 2015-06-29 15:11:20,571 [rxjava.ConcurrentRxJava] RxComputationThreadPool-2
INFO 2015-06-29 15:11:20,694 [rxjava.ConcurrentRxJava] RxComputationThreadPool-1
INFO 2015-06-29 15:11:20,920 [rxjava.ConcurrentRxJava] RxComputationThreadPool-8
INFO 2015-06-29 15:11:21,035 [rxjava.ConcurrentRxJava] RxComputationThreadPool-7
INFO 2015-06-29 15:11:21,039 [rxjava.ConcurrentRxJava] RxComputationThreadPool-4
INFO 2015-06-29 15:11:21,055 [rxjava.ConcurrentRxJava] RxComputationThreadPool-5
INFO 2015-06-29 15:11:21,081 [rxjava.ConcurrentRxJava] RxComputationThreadPool-3
INFO 2015-06-29 15:11:21,094 [rxjava.ConcurrentRxJava] RxComputationThreadPool-6
INFO 2015-06-29 15:11:21,118 [rxjava.ConcurrentRxJava] RxComputationThreadPool-2
在我的执行程序版本中,使用 Runtime.getRuntime().availableProcessors() * 10
,我得到大小为 80 的池。 RxJava 可以吗?
默认情况下,toAsync()
在具有固定线程数的 computation()
调度程序上运行。有一个重载需要一个调度程序,因此您应该将 Schedulers.from(...)
重构为局部变量并将该变量传递给 toAsync()
.