Java 从多个调用中收集 CompletableFuture 的结果
Java collecting results of CompletableFuture from multiple calls
我要运行多次外部调用操作,然后以list的形式获取结果。
我决定使用 CompletableFuture
api,我准备的代码非常恶心:
例子:
public class Main {
public static void main(String[] args) {
String prefix = "collection_";
List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(
() -> callApi(name)))
.collect(Collectors.toList());
try {
CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
List<User> users = usersResult //the result I need
.stream()
.map(userCompletableFuture -> {
try {
return userCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static User callApi(String collection) {
return new User(); //potentially time-consuming operation
}
}
我有以下问题:
- 我能否以某种方式避免在流中复制
try-catch
块,我在其中将 CompletableFuture 映射到用户?
- 此代码能否减少顺序性(如何避免等待所有期货完成?)
这样可以吗(会不会所有的future都在stream中解决?):
public class Main {
public static void main(String[] args) {
String prefix = "collection_";
List<User> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(
() -> callApi(name)))
.filter(Objects::nonNull)
.map(userCompletableFuture -> {
try {
return userCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.collect(Collectors.toList());
}
private static User callApi(String collection) {
return new User(); //potentially time-consuming operation
}
}
对于 1.,您可以完全跳过 allOf().get()
调用,因为您无论如何都在一个接一个地等待所有期货。¹
对于 2.,您可以通过执行以下操作来简化 try-catch
:
- 以后使用
exceptionally()
直接处理异常;
- 使用
join()
而不是 get()
来避免检查异常(而且你知道不可能有异常)。
对于 3.,您不能真正减少顺序,因为您至少需要以下步骤:创建所有期货,然后处理它们的结果。
如果你在一个流中做所有事情,它会创建每个未来,然后在创建下一个之前立即等待它——所以你会失去并行性。您可以改用并行流,但是使用 CompletableFuture
s.
不会有太大好处
所以最后的代码是:
List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(() -> callApi(name))
.exceptionally(e -> {
e.printStackTrace();
return null;
}))
.collect(Collectors.toList());
List<User> users = usersResult
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
¹ 请注意,如果您希望结果也是 CompletableFuture<List<User>>
,则仍然需要 allOf()
调用,例如
final CompletableFuture<List<User>> result =
CompletableFuture.allOf(usersResult.stream().toArray(CompletableFuture[]::new))
.thenApply(__ -> usersResult
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
或者,您可以删除 CompletableFuture
并使用 parallelStream()
,如 Didier 所述:
Optional<User> wrapApiCall(String name) {
try { return Optional.of(callApi(name)); }
catch (Exception e) {
e.printStackTrace();
return Optional.empty();
}
}
List<User> usersResult = IntStream.range(1, 10)
.boxed()
.parallelStream()
.map(num -> String.format("%s%d", prefix, num))
.map(this::wrapApiCall)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
我要运行多次外部调用操作,然后以list的形式获取结果。
我决定使用 CompletableFuture
api,我准备的代码非常恶心:
例子:
public class Main {
public static void main(String[] args) {
String prefix = "collection_";
List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(
() -> callApi(name)))
.collect(Collectors.toList());
try {
CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
List<User> users = usersResult //the result I need
.stream()
.map(userCompletableFuture -> {
try {
return userCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static User callApi(String collection) {
return new User(); //potentially time-consuming operation
}
}
我有以下问题:
- 我能否以某种方式避免在流中复制
try-catch
块,我在其中将 CompletableFuture 映射到用户? - 此代码能否减少顺序性(如何避免等待所有期货完成?)
这样可以吗(会不会所有的future都在stream中解决?):
public class Main { public static void main(String[] args) { String prefix = "collection_"; List<User> usersResult = IntStream.range(1, 10) .boxed() .map(num -> prefix.concat("" + num)) .map(name -> CompletableFuture.supplyAsync( () -> callApi(name))) .filter(Objects::nonNull) .map(userCompletableFuture -> { try { return userCompletableFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return null; }) .collect(Collectors.toList()); } private static User callApi(String collection) { return new User(); //potentially time-consuming operation } }
对于 1.,您可以完全跳过 allOf().get()
调用,因为您无论如何都在一个接一个地等待所有期货。¹
对于 2.,您可以通过执行以下操作来简化 try-catch
:
- 以后使用
exceptionally()
直接处理异常; - 使用
join()
而不是get()
来避免检查异常(而且你知道不可能有异常)。
对于 3.,您不能真正减少顺序,因为您至少需要以下步骤:创建所有期货,然后处理它们的结果。
如果你在一个流中做所有事情,它会创建每个未来,然后在创建下一个之前立即等待它——所以你会失去并行性。您可以改用并行流,但是使用 CompletableFuture
s.
所以最后的代码是:
List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(() -> callApi(name))
.exceptionally(e -> {
e.printStackTrace();
return null;
}))
.collect(Collectors.toList());
List<User> users = usersResult
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
¹ 请注意,如果您希望结果也是 CompletableFuture<List<User>>
,则仍然需要 allOf()
调用,例如
final CompletableFuture<List<User>> result =
CompletableFuture.allOf(usersResult.stream().toArray(CompletableFuture[]::new))
.thenApply(__ -> usersResult
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
或者,您可以删除 CompletableFuture
并使用 parallelStream()
,如 Didier 所述:
Optional<User> wrapApiCall(String name) {
try { return Optional.of(callApi(name)); }
catch (Exception e) {
e.printStackTrace();
return Optional.empty();
}
}
List<User> usersResult = IntStream.range(1, 10)
.boxed()
.parallelStream()
.map(num -> String.format("%s%d", prefix, num))
.map(this::wrapApiCall)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());