CompletableFuture.allOf 使用哪个执行器?

Which executor does CompletableFuture.allOf use?

假设我们有两个执行器,1 和 2。

我们可以配置执行时使用哪个执行器

CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()-> {return 1;}, executor1) //executor1
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()-> {return 2;}, executor1) //executor1
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(()-> {return 3;}, executor2) //executor2

但是哪个线程执行器使用 CompletableFuture 静态方法 allOf?

CompletableFuture.allOf(cf1, cf2, cf3)

谢谢!

没有与 CompletableFuture#allOf 相关联的执行程序,它只是生成 CompletableFuture,它将在您将调用 CompletableFuture#get() 的同一线程中等待依赖项的完成。

在你的例子中,cf1cf2后面的任务仍将由executor1执行,cf2中的任务将由[=18=执行], allOf(..).get() 的结果将在当前线程中返回,并且不会在后台启动其他线程。

这里是示例,如何通过在 System.out.println 行设置断点并检查活动线程列表来观察 IDE 中的实际行为。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

import static java.util.concurrent.CompletableFuture.supplyAsync;

public class ExecutorTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Executor executor1 = Executors.newSingleThreadExecutor();
        Executor executor2 = Executors.newSingleThreadExecutor();
        CompletableFuture<Integer> cf1 = supplyAsync(run(1), executor1); //executor1
        CompletableFuture<Integer> cf2 = supplyAsync(run(2), executor1); //executor1
        CompletableFuture<Integer> cf3 = supplyAsync(run(3), executor2); //executor2
        CompletableFuture<Void> result = CompletableFuture.allOf(cf1, cf2, cf3);
        new Thread(() -> {
            try {
                result.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }).start();
        System.out.println("Waiting now...");
    }

    private static Supplier<Integer> run(int result) {
        return () -> runDelayed(result);
    }

    private static int runDelayed(int result) {
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }

}

不准确。

确实没有与 allOf() 编辑的 CompletableFuture return 关联的执行程序,因为事实上,从来没有与任何 CompletableFuture 关联的执行程序.

一个 task 与一个执行器相关联,因为它在其中 运行,但是关联是相反的:执行器有一个要执行的任务列表.

一个任务也可以与一个CompletableFuture相关联,它会在任务完成时完成。 CompletableFuture 本身不保留对用于创建它的任务或执行程序的引用。但是,它可能会保留对依赖阶段中使用的任务和可选执行程序的引用。

allOf()编辑的CompletableFuture return将由一个任务完成,该任务是原始CompletableFuture的依赖阶段。在您的示例中,此任务可以通过以下方式执行:

  • executor1,如果第三个任务先完成;
  • executor2,如果第2个任务先于第3个任务完成;或
  • 原始线程,如果所有任务在您调用 allOf() 之前完成。

这可以通过向 allOf() 调用添加依赖 thenRun() 阶段来看出:

public class CompletableFutureAllOfCompletion {
    private ExecutorService executor1 = Executors.newFixedThreadPool(2);
    private ExecutorService executor2 = Executors.newFixedThreadPool(2);
    private Random random = new Random();

    public static void main(String[] args) {
        new CompletableFutureAllOfCompletion().run();
    }

    public void run() {
        CompletableFuture<Integer> cf1 = supplyAsync(this::randomSleepAndReturn, executor1);
        CompletableFuture<Integer> cf2 = supplyAsync(this::randomSleepAndReturn, executor1);
        CompletableFuture<Integer> cf3 = supplyAsync(this::randomSleepAndReturn, executor2);
        randomSleepAndReturn();
        CompletableFuture.allOf(cf1, cf2, cf3)
                .thenRun(() -> System.out.println("allOf() commpleted on "
                        + Thread.currentThread().getName()));

        executor1.shutdown();
        executor2.shutdown();
    }

    public int randomSleepAndReturn() {
        try {
            final long millis = random.nextInt(1000);
            System.out.println(
                    Thread.currentThread().getName() + " waiting for " + millis);
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 0;
    }
}

一些可能的输出:

在第一个执行者上完成(第三个任务最先完成):

pool-1-thread-1 waiting for 937
pool-1-thread-2 waiting for 631
main waiting for 776
pool-2-thread-1 waiting for 615
allOf() commpleted on pool-1-thread-1

在第二个执行者上完成(第一个和第二个任务在第三个之前完成):

pool-1-thread-1 waiting for 308
pool-1-thread-2 waiting for 788
main waiting for 389
pool-2-thread-1 waiting for 863
allOf() commpleted on pool-2-thread-1

正在主线程上完成(所有任务在 allOf().thenRun() 之前完成):

pool-1-thread-1 waiting for 168
pool-1-thread-2 waiting for 292
main waiting for 941
pool-2-thread-1 waiting for 188
allOf() commpleted on main

如何控制allOf()(或anyOf()

后要使用的执行器

由于无法保证将使用哪个执行器,因此在调用其中一种方法后应进行 *Async(, executor) 调用以控制将使用哪个执行器。

如果您需要 return 这些调用之一的结果 CompletableFuture,只需在 return 之前添加一个 thenApplyAsync(i -> i, executor)