使用 RxJava 将并行任务列表的结果组合到单个 HashMap
Combine the result of a list of parallel tasks to a single HashMap using RxJava
我有一个 List<Task>
,其中 Task
是一个具有单一方法的接口,return 是 Map<String, JsonElement>
。如何并行执行 List<Task>
和 return 一个新的 HashMap
以及每个 Task
的组合结果?
我目前有这个:
List<Task> tasks = getTasks();
Observable.from(tasks)
.flatMap(new Func1<Task, Observable<Map<String, JsonElement>>>() {
@Override
public Observable<Map<String, JsonElement>> call(Task task) {
return Observable.just(task.get());
}
});
// group into single Map<String,JsonElement>
// create Observable<Map<String,JsonElement>> with all results
使用defer
封装每个订阅的Map
和一个Scheduler
基于你想要的线程池大小:
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.defer(() -> {
final Map<String, JsonElement> map = new ConcurrentHashMap<>();
return Observable
.from(tasks)
.flatMap(task ->
Observable
.fromCallable(task -> task.get())
.doOnNext(mp -> map.putAll(mp))
.subscribeOn(scheduler))
.ignoreElements()
.concatWith(Observable.just(map));
});
请注意,Scheduler
的选择将取决于所执行任务的性质。如果 CPU 占主导地位,您可能会对 Schedulers.computation()
感到满意。
我有一个 List<Task>
,其中 Task
是一个具有单一方法的接口,return 是 Map<String, JsonElement>
。如何并行执行 List<Task>
和 return 一个新的 HashMap
以及每个 Task
的组合结果?
我目前有这个:
List<Task> tasks = getTasks();
Observable.from(tasks)
.flatMap(new Func1<Task, Observable<Map<String, JsonElement>>>() {
@Override
public Observable<Map<String, JsonElement>> call(Task task) {
return Observable.just(task.get());
}
});
// group into single Map<String,JsonElement>
// create Observable<Map<String,JsonElement>> with all results
使用defer
封装每个订阅的Map
和一个Scheduler
基于你想要的线程池大小:
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.defer(() -> {
final Map<String, JsonElement> map = new ConcurrentHashMap<>();
return Observable
.from(tasks)
.flatMap(task ->
Observable
.fromCallable(task -> task.get())
.doOnNext(mp -> map.putAll(mp))
.subscribeOn(scheduler))
.ignoreElements()
.concatWith(Observable.just(map));
});
请注意,Scheduler
的选择将取决于所执行任务的性质。如果 CPU 占主导地位,您可能会对 Schedulers.computation()
感到满意。