如何聚合在循环中进行 CompletableFuture 调用的结果?

How to aggregate results from making CompletableFuture calls in a loop?

我正在学习并尝试将 CompletableFuture 应用于我的问题陈述。我有一个要迭代的项目列表。

Prop 是一个 class,只有两个属性 prop1 和 prop2,分别是 getter 和 setter。

List<Prop> result = new ArrayList<>(); 

for ( Item item : items ) {
      item.load();

      Prop temp = new Prop();
      // once the item is loaded, get its properties
      temp.setProp1(item.getProp1());
      temp.setProp2(item.getProp2());

      result.add(temp);
}

return result;

然而,这里的item.load()是一个阻塞调用。所以,我正在考虑使用如下所示的 CompletableFuture -

for (Item item : items) {
    CompletableFuture<Prop> prop = CompletableFuture.supplyAsync(() -> {
        try {
            item.load();
            return item;
        } catch (Exception e) {
            logger.error("Error");
            return null;
        }
    }).thenApply(item1 -> {
        try {
            Prop temp = new Prop();
            // once the item is loaded, get its properties
            temp.setProp1(item.getProp1());
            temp.setProp2(item.getProp2());

            return temp;
        } catch (Exception e) {
        }
    });
}

但我不确定如何等待所有项目加载,然后聚合并return它们的结果。

我在实现 CompletableFutures 的方式上可能完全错误,因为这是我的第一次尝试。请原谅任何错误。在此先感谢您的帮助。

您使用 CompletableFuture 的方法有两个问题。

首先,你说 item.load() 是一个阻塞调用,所以 CompletableFuture 的默认执行器不适合它,因为它试图达到与 CPU 核心。您可以通过将不同的 Executor 传递给 CompletableFuture 的异步方法来解决此问题,但是您的 load() 方法不会 return 后续操作所依赖的值。所以使用 CompletableFuture 会使设计复杂化而没有任何好处。

您可以异步执行 load() 调用并等待它们完成,只需使用 ExecutorService,然后按原样循环(没有已经执行的 load() 操作,课程):

ExecutorService es = Executors.newCachedThreadPool();
es.invokeAll(items.stream()
    .map(i -> Executors.callable(i::load))
    .collect(Collectors.toList()));
es.shutdown();

List<Prop> result = new ArrayList<>(); 

for(Item item : items) {
      Prop temp = new Prop();
      // once the item is loaded, get its properties
      temp.setProp1(item.getProp1());
      temp.setProp2(item.getProp2());

      result.add(temp);
}

return result;

您可以通过选择执行器来控制并行度,例如您可以使用 Executors.newFixedThreadPool(numberOfThreads) 而不是无限线程池。