CompletableFuture.allOf 使用哪个执行器?
Which executor does CompletableFuture.allOf use?
假设我们有两个执行器,1 和 2。
我们可以配置执行时使用哪个执行器
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()-> {return 1;}, executor1) //executor1
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()-> {return 2;}, executor1) //executor1
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(()-> {return 3;}, executor2) //executor2
但是哪个线程执行器使用 CompletableFuture 静态方法 allOf?
CompletableFuture.allOf(cf1, cf2, cf3)
谢谢!
没有与 CompletableFuture#allOf
相关联的执行程序,它只是生成 CompletableFuture
,它将在您将调用 CompletableFuture#get()
的同一线程中等待依赖项的完成。
在你的例子中,cf1
和cf2
后面的任务仍将由executor1
执行,cf2
中的任务将由[=18=执行], allOf(..).get()
的结果将在当前线程中返回,并且不会在后台启动其他线程。
这里是示例,如何通过在 System.out.println 行设置断点并检查活动线程列表来观察 IDE 中的实际行为。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import static java.util.concurrent.CompletableFuture.supplyAsync;
public class ExecutorTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Executor executor1 = Executors.newSingleThreadExecutor();
Executor executor2 = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> cf1 = supplyAsync(run(1), executor1); //executor1
CompletableFuture<Integer> cf2 = supplyAsync(run(2), executor1); //executor1
CompletableFuture<Integer> cf3 = supplyAsync(run(3), executor2); //executor2
CompletableFuture<Void> result = CompletableFuture.allOf(cf1, cf2, cf3);
new Thread(() -> {
try {
result.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
System.out.println("Waiting now...");
}
private static Supplier<Integer> run(int result) {
return () -> runDelayed(result);
}
private static int runDelayed(int result) {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}
不准确。
确实没有与 allOf()
编辑的 CompletableFuture
return 关联的执行程序,因为事实上,从来没有与任何 CompletableFuture
关联的执行程序.
一个 task 与一个执行器相关联,因为它在其中 运行,但是关联是相反的:执行器有一个要执行的任务列表.
一个任务也可以与一个CompletableFuture
相关联,它会在任务完成时完成。 CompletableFuture
本身不保留对用于创建它的任务或执行程序的引用。但是,它可能会保留对依赖阶段中使用的任务和可选执行程序的引用。
由allOf()
编辑的CompletableFuture
return将由一个任务完成,该任务是原始CompletableFuture
的依赖阶段。在您的示例中,此任务可以通过以下方式执行:
executor1
,如果第三个任务先完成;
executor2
,如果第2个任务先于第3个任务完成;或
- 原始线程,如果所有任务在您调用
allOf()
之前完成。
这可以通过向 allOf()
调用添加依赖 thenRun()
阶段来看出:
public class CompletableFutureAllOfCompletion {
private ExecutorService executor1 = Executors.newFixedThreadPool(2);
private ExecutorService executor2 = Executors.newFixedThreadPool(2);
private Random random = new Random();
public static void main(String[] args) {
new CompletableFutureAllOfCompletion().run();
}
public void run() {
CompletableFuture<Integer> cf1 = supplyAsync(this::randomSleepAndReturn, executor1);
CompletableFuture<Integer> cf2 = supplyAsync(this::randomSleepAndReturn, executor1);
CompletableFuture<Integer> cf3 = supplyAsync(this::randomSleepAndReturn, executor2);
randomSleepAndReturn();
CompletableFuture.allOf(cf1, cf2, cf3)
.thenRun(() -> System.out.println("allOf() commpleted on "
+ Thread.currentThread().getName()));
executor1.shutdown();
executor2.shutdown();
}
public int randomSleepAndReturn() {
try {
final long millis = random.nextInt(1000);
System.out.println(
Thread.currentThread().getName() + " waiting for " + millis);
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0;
}
}
一些可能的输出:
在第一个执行者上完成(第三个任务最先完成):
pool-1-thread-1 waiting for 937
pool-1-thread-2 waiting for 631
main waiting for 776
pool-2-thread-1 waiting for 615
allOf() commpleted on pool-1-thread-1
在第二个执行者上完成(第一个和第二个任务在第三个之前完成):
pool-1-thread-1 waiting for 308
pool-1-thread-2 waiting for 788
main waiting for 389
pool-2-thread-1 waiting for 863
allOf() commpleted on pool-2-thread-1
正在主线程上完成(所有任务在 allOf().thenRun()
之前完成):
pool-1-thread-1 waiting for 168
pool-1-thread-2 waiting for 292
main waiting for 941
pool-2-thread-1 waiting for 188
allOf() commpleted on main
如何控制allOf()
(或anyOf()
)
后要使用的执行器
由于无法保证将使用哪个执行器,因此在调用其中一种方法后应进行 *Async(, executor)
调用以控制将使用哪个执行器。
如果您需要 return 这些调用之一的结果 CompletableFuture
,只需在 return 之前添加一个 thenApplyAsync(i -> i, executor)
。
假设我们有两个执行器,1 和 2。
我们可以配置执行时使用哪个执行器
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()-> {return 1;}, executor1) //executor1
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()-> {return 2;}, executor1) //executor1
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(()-> {return 3;}, executor2) //executor2
但是哪个线程执行器使用 CompletableFuture 静态方法 allOf?
CompletableFuture.allOf(cf1, cf2, cf3)
谢谢!
没有与 CompletableFuture#allOf
相关联的执行程序,它只是生成 CompletableFuture
,它将在您将调用 CompletableFuture#get()
的同一线程中等待依赖项的完成。
在你的例子中,cf1
和cf2
后面的任务仍将由executor1
执行,cf2
中的任务将由[=18=执行], allOf(..).get()
的结果将在当前线程中返回,并且不会在后台启动其他线程。
这里是示例,如何通过在 System.out.println 行设置断点并检查活动线程列表来观察 IDE 中的实际行为。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import static java.util.concurrent.CompletableFuture.supplyAsync;
public class ExecutorTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Executor executor1 = Executors.newSingleThreadExecutor();
Executor executor2 = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> cf1 = supplyAsync(run(1), executor1); //executor1
CompletableFuture<Integer> cf2 = supplyAsync(run(2), executor1); //executor1
CompletableFuture<Integer> cf3 = supplyAsync(run(3), executor2); //executor2
CompletableFuture<Void> result = CompletableFuture.allOf(cf1, cf2, cf3);
new Thread(() -> {
try {
result.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
System.out.println("Waiting now...");
}
private static Supplier<Integer> run(int result) {
return () -> runDelayed(result);
}
private static int runDelayed(int result) {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}
确实没有与 allOf()
编辑的 CompletableFuture
return 关联的执行程序,因为事实上,从来没有与任何 CompletableFuture
关联的执行程序.
一个 task 与一个执行器相关联,因为它在其中 运行,但是关联是相反的:执行器有一个要执行的任务列表.
一个任务也可以与一个CompletableFuture
相关联,它会在任务完成时完成。 CompletableFuture
本身不保留对用于创建它的任务或执行程序的引用。但是,它可能会保留对依赖阶段中使用的任务和可选执行程序的引用。
由allOf()
编辑的CompletableFuture
return将由一个任务完成,该任务是原始CompletableFuture
的依赖阶段。在您的示例中,此任务可以通过以下方式执行:
executor1
,如果第三个任务先完成;executor2
,如果第2个任务先于第3个任务完成;或- 原始线程,如果所有任务在您调用
allOf()
之前完成。
这可以通过向 allOf()
调用添加依赖 thenRun()
阶段来看出:
public class CompletableFutureAllOfCompletion {
private ExecutorService executor1 = Executors.newFixedThreadPool(2);
private ExecutorService executor2 = Executors.newFixedThreadPool(2);
private Random random = new Random();
public static void main(String[] args) {
new CompletableFutureAllOfCompletion().run();
}
public void run() {
CompletableFuture<Integer> cf1 = supplyAsync(this::randomSleepAndReturn, executor1);
CompletableFuture<Integer> cf2 = supplyAsync(this::randomSleepAndReturn, executor1);
CompletableFuture<Integer> cf3 = supplyAsync(this::randomSleepAndReturn, executor2);
randomSleepAndReturn();
CompletableFuture.allOf(cf1, cf2, cf3)
.thenRun(() -> System.out.println("allOf() commpleted on "
+ Thread.currentThread().getName()));
executor1.shutdown();
executor2.shutdown();
}
public int randomSleepAndReturn() {
try {
final long millis = random.nextInt(1000);
System.out.println(
Thread.currentThread().getName() + " waiting for " + millis);
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0;
}
}
一些可能的输出:
在第一个执行者上完成(第三个任务最先完成):
pool-1-thread-1 waiting for 937
pool-1-thread-2 waiting for 631
main waiting for 776
pool-2-thread-1 waiting for 615
allOf() commpleted on pool-1-thread-1
在第二个执行者上完成(第一个和第二个任务在第三个之前完成):
pool-1-thread-1 waiting for 308
pool-1-thread-2 waiting for 788
main waiting for 389
pool-2-thread-1 waiting for 863
allOf() commpleted on pool-2-thread-1
正在主线程上完成(所有任务在 allOf().thenRun()
之前完成):
pool-1-thread-1 waiting for 168
pool-1-thread-2 waiting for 292
main waiting for 941
pool-2-thread-1 waiting for 188
allOf() commpleted on main
如何控制allOf()
(或anyOf()
)
后要使用的执行器
由于无法保证将使用哪个执行器,因此在调用其中一种方法后应进行 *Async(, executor)
调用以控制将使用哪个执行器。
如果您需要 return 这些调用之一的结果 CompletableFuture
,只需在 return 之前添加一个 thenApplyAsync(i -> i, executor)
。