使用 RxJava 将并行任务列表的结果组合到单个 HashMap

Combine the result of a list of parallel tasks to a single HashMap using RxJava

我有一个 List<Task>,其中 Task 是一个具有单一方法的接口,return 是 Map<String, JsonElement>。如何并行执行 List<Task> 和 return 一个新的 HashMap 以及每个 Task 的组合结果?

我目前有这个:

List<Task> tasks = getTasks();

Observable.from(tasks)
    .flatMap(new Func1<Task, Observable<Map<String, JsonElement>>>() {
        @Override
        public Observable<Map<String, JsonElement>> call(Task task) {
            return Observable.just(task.get());
        }
    });

// group into single Map<String,JsonElement>
// create Observable<Map<String,JsonElement>> with all results

使用defer封装每个订阅的Map和一个Scheduler基于你想要的线程池大小:

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.defer(() -> {
  final Map<String, JsonElement> map = new ConcurrentHashMap<>();
  return Observable
    .from(tasks)
    .flatMap(task -> 
      Observable
        .fromCallable(task -> task.get())
        .doOnNext(mp -> map.putAll(mp)) 
        .subscribeOn(scheduler))
    .ignoreElements()
    .concatWith(Observable.just(map));
});

请注意,Scheduler 的选择将取决于所执行任务的性质。如果 CPU 占主导地位,您可能会对 Schedulers.computation() 感到满意。