Java 从多个调用中收集 CompletableFuture 的结果

Java collecting results of CompletableFuture from multiple calls

我要运行多次外部调用操作,然后以list的形式获取结果。 我决定使用 CompletableFuture api,我准备的代码非常恶心:

例子:

public class Main {
    public static void main(String[] args) {
        String prefix = "collection_";

        List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
                .boxed()
                .map(num -> prefix.concat("" + num))
                .map(name -> CompletableFuture.supplyAsync(
                        () -> callApi(name)))
                .collect(Collectors.toList());

        try {
            CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        List<User> users = usersResult //the result I need
                .stream()
                .map(userCompletableFuture -> {
                    try {
                        return userCompletableFuture.get();
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                    return null;
                })
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }

    private static User callApi(String collection) {
        return new User(); //potentially time-consuming operation
    }
}

我有以下问题:

  1. 我能否以某种方式避免在流中复制 try-catch 块,我在其中将 CompletableFuture 映射到用户?
  2. 此代码能否减少顺序性(如何避免等待所有期货完成?)
  3. 这样可以吗(会不会所有的future都在stream中解决?):

    public class Main {
        public static void main(String[] args) {
            String prefix = "collection_";
    
            List<User> usersResult = IntStream.range(1, 10)
                    .boxed()
                    .map(num -> prefix.concat("" + num))
                    .map(name -> CompletableFuture.supplyAsync(
                            () -> callApi(name)))
                    .filter(Objects::nonNull)
                    .map(userCompletableFuture -> {
                        try {
                            return userCompletableFuture.get();
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                        }
                        return null;
                    })
                    .collect(Collectors.toList());
        }
    
        private static User callApi(String collection) {
            return new User(); //potentially time-consuming operation
        }
    }
    

对于 1.,您可以完全跳过 allOf().get() 调用,因为您无论如何都在一个接一个地等待所有期货。¹

对于 2.,您可以通过执行以下操作来简化 try-catch

  • 以后使用exceptionally()直接处理异常;
  • 使用 join() 而不是 get() 来避免检查异常(而且你知道不可能有异常)。

对于 3.,您不能真正减少顺序,因为您至少需要以下步骤:创建所有期货,然后处理它们的结果。

如果你在一个流中做所有事情,它会创建每个未来,然后在创建下一个之前立即等待它——所以你会失去并行性。您可以改用并行流,但是使用 CompletableFutures.

不会有太大好处

所以最后的代码是:

List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
        .boxed()
        .map(num -> prefix.concat("" + num))
        .map(name -> CompletableFuture.supplyAsync(() -> callApi(name))
            .exceptionally(e -> {
                e.printStackTrace();
                return null;
            }))
        .collect(Collectors.toList());

List<User> users = usersResult
        .stream()
        .map(CompletableFuture::join)
        .filter(Objects::nonNull)
        .collect(Collectors.toList());

¹ 请注意,如果您希望结果也是 CompletableFuture<List<User>>,则仍然需要 allOf() 调用,例如

final CompletableFuture<List<User>> result =
        CompletableFuture.allOf(usersResult.stream().toArray(CompletableFuture[]::new))
                .thenApply(__ -> usersResult
                        .stream()
                        .map(CompletableFuture::join)
                        .filter(Objects::nonNull)
                        .collect(Collectors.toList()));

或者,您可以删除 CompletableFuture 并使用 parallelStream(),如 D​​idier 所述:

Optional<User> wrapApiCall(String name) {
    try { return Optional.of(callApi(name)); }
    catch (Exception e) { 
        e.printStackTrace();
        return Optional.empty(); 
    }
}

List<User> usersResult = IntStream.range(1, 10)
    .boxed()
    .parallelStream()
    .map(num -> String.format("%s%d", prefix, num))
    .map(this::wrapApiCall)
    .filter(Optional::isPresent)
    .map(Optional::get)
    .collect(Collectors.toList());