如何聚合在循环中进行 CompletableFuture 调用的结果?
How to aggregate results from making CompletableFuture calls in a loop?
我正在学习并尝试将 CompletableFuture 应用于我的问题陈述。我有一个要迭代的项目列表。
Prop 是一个 class,只有两个属性 prop1 和 prop2,分别是 getter 和 setter。
List<Prop> result = new ArrayList<>();
for ( Item item : items ) {
item.load();
Prop temp = new Prop();
// once the item is loaded, get its properties
temp.setProp1(item.getProp1());
temp.setProp2(item.getProp2());
result.add(temp);
}
return result;
然而,这里的item.load()是一个阻塞调用。所以,我正在考虑使用如下所示的 CompletableFuture -
for (Item item : items) {
CompletableFuture<Prop> prop = CompletableFuture.supplyAsync(() -> {
try {
item.load();
return item;
} catch (Exception e) {
logger.error("Error");
return null;
}
}).thenApply(item1 -> {
try {
Prop temp = new Prop();
// once the item is loaded, get its properties
temp.setProp1(item.getProp1());
temp.setProp2(item.getProp2());
return temp;
} catch (Exception e) {
}
});
}
但我不确定如何等待所有项目加载,然后聚合并return它们的结果。
我在实现 CompletableFutures 的方式上可能完全错误,因为这是我的第一次尝试。请原谅任何错误。在此先感谢您的帮助。
您使用 CompletableFuture
的方法有两个问题。
首先,你说 item.load()
是一个阻塞调用,所以 CompletableFuture
的默认执行器不适合它,因为它试图达到与 CPU 核心。您可以通过将不同的 Executor
传递给 CompletableFuture
的异步方法来解决此问题,但是您的 load()
方法不会 return 后续操作所依赖的值。所以使用 CompletableFuture
会使设计复杂化而没有任何好处。
您可以异步执行 load()
调用并等待它们完成,只需使用 ExecutorService
,然后按原样循环(没有已经执行的 load()
操作,课程):
ExecutorService es = Executors.newCachedThreadPool();
es.invokeAll(items.stream()
.map(i -> Executors.callable(i::load))
.collect(Collectors.toList()));
es.shutdown();
List<Prop> result = new ArrayList<>();
for(Item item : items) {
Prop temp = new Prop();
// once the item is loaded, get its properties
temp.setProp1(item.getProp1());
temp.setProp2(item.getProp2());
result.add(temp);
}
return result;
您可以通过选择执行器来控制并行度,例如您可以使用 Executors.newFixedThreadPool(numberOfThreads)
而不是无限线程池。
我正在学习并尝试将 CompletableFuture 应用于我的问题陈述。我有一个要迭代的项目列表。
Prop 是一个 class,只有两个属性 prop1 和 prop2,分别是 getter 和 setter。
List<Prop> result = new ArrayList<>();
for ( Item item : items ) {
item.load();
Prop temp = new Prop();
// once the item is loaded, get its properties
temp.setProp1(item.getProp1());
temp.setProp2(item.getProp2());
result.add(temp);
}
return result;
然而,这里的item.load()是一个阻塞调用。所以,我正在考虑使用如下所示的 CompletableFuture -
for (Item item : items) {
CompletableFuture<Prop> prop = CompletableFuture.supplyAsync(() -> {
try {
item.load();
return item;
} catch (Exception e) {
logger.error("Error");
return null;
}
}).thenApply(item1 -> {
try {
Prop temp = new Prop();
// once the item is loaded, get its properties
temp.setProp1(item.getProp1());
temp.setProp2(item.getProp2());
return temp;
} catch (Exception e) {
}
});
}
但我不确定如何等待所有项目加载,然后聚合并return它们的结果。
我在实现 CompletableFutures 的方式上可能完全错误,因为这是我的第一次尝试。请原谅任何错误。在此先感谢您的帮助。
您使用 CompletableFuture
的方法有两个问题。
首先,你说 item.load()
是一个阻塞调用,所以 CompletableFuture
的默认执行器不适合它,因为它试图达到与 CPU 核心。您可以通过将不同的 Executor
传递给 CompletableFuture
的异步方法来解决此问题,但是您的 load()
方法不会 return 后续操作所依赖的值。所以使用 CompletableFuture
会使设计复杂化而没有任何好处。
您可以异步执行 load()
调用并等待它们完成,只需使用 ExecutorService
,然后按原样循环(没有已经执行的 load()
操作,课程):
ExecutorService es = Executors.newCachedThreadPool();
es.invokeAll(items.stream()
.map(i -> Executors.callable(i::load))
.collect(Collectors.toList()));
es.shutdown();
List<Prop> result = new ArrayList<>();
for(Item item : items) {
Prop temp = new Prop();
// once the item is loaded, get its properties
temp.setProp1(item.getProp1());
temp.setProp2(item.getProp2());
result.add(temp);
}
return result;
您可以通过选择执行器来控制并行度,例如您可以使用 Executors.newFixedThreadPool(numberOfThreads)
而不是无限线程池。