CompletableFuture allof(..).join() 与 CompletableFuture.join()

CompletableFuture allof(..).join() vs CompletableFuture.join()

我目前正在使用 CompletableFuture supplyAsync() 方法将一些任务提交到公共线程池。下面是代码片段:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

我想知道下面的代码与上面的代码有何不同。我从下面的代码中删除了父 completableFuture,并为每个 completableFuture 添加了 join() 而不是 getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

我在 spring 服务中使用它,但存在线程池耗尽问题。非常感谢任何指点。

首先,.getNow() 不起作用,因为此方法需要一个回退值作为未来尚未完成的情况的参数。既然你假设未来要在这里完成,你也应该使用join().

然后,线程耗尽没有区别,因为在这两种情况下,您都在等待所有作业完成后再继续,可能会阻塞当前线程。

避免这种情况的唯一方法是重构代码,使其不期望同步结果,而是在所有作业完成后安排后续处理操作完成。然后,使用 allOf 变得相关:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
    .map(resolver -> supplyAsync(() -> task.doWork()))
    .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
    .thenAccept(justVoid -> {
        // here, all jobs have been completed
        final List<Test> tests = completableFutures.stream()
            .flatMap(completableFuture -> completableFuture.join().stream())
            .collect(toList());
        // process the result here
    });

顺便说一句,关于集合的toArray方法,我推荐阅读Arrays of Wisdom of the Ancients