为什么 CompletableFuture join/get 在单独的流中比使用一个流更快
Why is CompletableFuture join/get faster in separate streams than using one stream
对于以下程序,我试图弄清楚为什么使用 2 个不同的流并行化任务并使用相同的流并在 Completable future 上调用 join/get 使它们花费更长的时间,就好像它们是按顺序处理)。
public class HelloConcurrency {
private static Integer sleepTask(int number) {
System.out.println(String.format("Task with sleep time %d", number));
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
return -1;
}
return number;
}
public static void main(String[] args) {
List<Integer> sleepTimes = Arrays.asList(1,2,3,4,5,6);
System.out.println("WITH SEPARATE STREAMS FOR FUTURE AND JOIN");
ExecutorService executorService = Executors.newFixedThreadPool(6);
long start = System.currentTimeMillis();
List<CompletableFuture<Integer>> futures = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.collect(Collectors.toList());
executorService.shutdown();
List<Integer> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long finish = System.currentTimeMillis();
long timeElapsed = (finish - start)/1000;
System.out.println(String.format("done in %d seconds.", timeElapsed));
System.out.println(result);
System.out.println("WITH SAME STREAM FOR FUTURE AND JOIN");
ExecutorService executorService2 = Executors.newFixedThreadPool(6);
start = System.currentTimeMillis();
List<Integer> results = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService2)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.map(CompletableFuture::join)
.collect(Collectors.toList());
executorService2.shutdown();
finish = System.currentTimeMillis();
timeElapsed = (finish - start)/1000;
System.out.println(String.format("done in %d seconds.", timeElapsed));
System.out.println(results);
}
}
输出
WITH SEPARATE STREAMS FOR FUTURE AND JOIN
Task with sleep time 6
Task with sleep time 5
Task with sleep time 1
Task with sleep time 3
Task with sleep time 2
Task with sleep time 4
done in 6 seconds.
[1, 2, 3, 4, 5, 6]
WITH SAME STREAM FOR FUTURE AND JOIN
Task with sleep time 1
Task with sleep time 2
Task with sleep time 3
Task with sleep time 4
Task with sleep time 5
Task with sleep time 6
done in 21 seconds.
[1, 2, 3, 4, 5, 6]
这两种方法有很大的不同,让我试着解释清楚
第一种方法: 在第一种方法中,您正在为所有 6 个任务启动所有 Async
请求,然后在每个任务上调用 join
函数他们得到结果
第二种方法: 但是在第二种方法中,您在为每个任务旋转 Async
请求后立即调用 join
。例如,在为任务 1
调用 join
旋转 Async
线程后,确保该线程完成任务,然后仅使用 Async
线程[=21= 旋转第二个任务]
注意: 另一方面,如果您清楚地观察输出,在第一种方法中,输出以随机顺序出现,因为所有六个任务都是异步执行的。但是在第二种方法中,所有任务都是一个接一个地顺序执行的。
我相信你知道流 map
操作是如何执行的,或者你可以从 or here
获得更多信息
To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as filter(Predicate)), and a terminal operation (which produces a result or side-effect, such as count() or forEach(Consumer)). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.
流框架没有定义 map
操作在流元素上执行的顺序,因为它不适用于可能是相关问题的用例。因此,您的第二个版本执行的特定方式本质上等同于
List<Integer> results = new ArrayList<>();
for (Integer sleepTime : sleepTimes) {
results.add(CompletableFuture
.supplyAsync(() -> sleepTask(sleepTime), executorService2)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.join());
}
...本质上等同于
List<Integer> results = new ArrayList<>()
for (Integer sleepTime : sleepTimes) {
results.add(sleepTask(sleepTime));
}
@Deadpool 回答得很好,只是添加我的回答,可以帮助人们更好地理解它。
通过向这两种方法添加更多打印,我能够得到答案。
TLDR
2 流方法: 我们异步启动所有 6 个任务,然后在每个任务上调用 join 函数以在单独的流中获取结果.
1 流方法: 我们在启动每个任务后立即调用连接。例如,在为任务 1 旋转一个线程后,调用 join 确保线程等待任务 1 完成,然后仅使用异步线程启动第二个任务。
注意:另外,如果我们清楚地观察输出,在 1 stream 方法中,由于所有六个任务都是按顺序执行的,所以输出呈现顺序。但是在第二种方法中,所有任务都是并行执行的,因此顺序是随机的。
注释 2:如果我们在 1 流方法中将 stream()
替换为 parallelStream()
,它将与 2 流方法相同。
更多证据
我向流中添加了更多打印,从而产生了以下输出并确认了上面的注释:
1 个流:
List<Integer> results = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService2)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.map(f -> {
int num = f.join();
System.out.println(String.format("doing join on task %d", num));
return num;
})
.collect(Collectors.toList());
WITH SAME STREAM FOR FUTURE AND JOIN
Task with sleep time 1
doing join on task 1
Task with sleep time 2
doing join on task 2
Task with sleep time 3
doing join on task 3
Task with sleep time 4
doing join on task 4
Task with sleep time 5
doing join on task 5
Task with sleep time 6
doing join on task 6
done in 21 seconds.
[1, 2, 3, 4, 5, 6]
2 个流:
List<CompletableFuture<Integer>> futures = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.collect(Collectors.toList());
List<Integer> result = futures.stream()
.map(f -> {
int num = f.join();
System.out.println(String.format("doing join on task %d", num));
return num;
})
.collect(Collectors.toList());
WITH SEPARATE STREAMS FOR FUTURE AND JOIN
Task with sleep time 2
Task with sleep time 5
Task with sleep time 3
Task with sleep time 1
Task with sleep time 4
Task with sleep time 6
doing join on task 1
doing join on task 2
doing join on task 3
doing join on task 4
doing join on task 5
doing join on task 6
done in 6 seconds.
[1, 2, 3, 4, 5, 6]
对于以下程序,我试图弄清楚为什么使用 2 个不同的流并行化任务并使用相同的流并在 Completable future 上调用 join/get 使它们花费更长的时间,就好像它们是按顺序处理)。
public class HelloConcurrency {
private static Integer sleepTask(int number) {
System.out.println(String.format("Task with sleep time %d", number));
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
return -1;
}
return number;
}
public static void main(String[] args) {
List<Integer> sleepTimes = Arrays.asList(1,2,3,4,5,6);
System.out.println("WITH SEPARATE STREAMS FOR FUTURE AND JOIN");
ExecutorService executorService = Executors.newFixedThreadPool(6);
long start = System.currentTimeMillis();
List<CompletableFuture<Integer>> futures = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.collect(Collectors.toList());
executorService.shutdown();
List<Integer> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long finish = System.currentTimeMillis();
long timeElapsed = (finish - start)/1000;
System.out.println(String.format("done in %d seconds.", timeElapsed));
System.out.println(result);
System.out.println("WITH SAME STREAM FOR FUTURE AND JOIN");
ExecutorService executorService2 = Executors.newFixedThreadPool(6);
start = System.currentTimeMillis();
List<Integer> results = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService2)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.map(CompletableFuture::join)
.collect(Collectors.toList());
executorService2.shutdown();
finish = System.currentTimeMillis();
timeElapsed = (finish - start)/1000;
System.out.println(String.format("done in %d seconds.", timeElapsed));
System.out.println(results);
}
}
输出
WITH SEPARATE STREAMS FOR FUTURE AND JOIN
Task with sleep time 6
Task with sleep time 5
Task with sleep time 1
Task with sleep time 3
Task with sleep time 2
Task with sleep time 4
done in 6 seconds.
[1, 2, 3, 4, 5, 6]
WITH SAME STREAM FOR FUTURE AND JOIN
Task with sleep time 1
Task with sleep time 2
Task with sleep time 3
Task with sleep time 4
Task with sleep time 5
Task with sleep time 6
done in 21 seconds.
[1, 2, 3, 4, 5, 6]
这两种方法有很大的不同,让我试着解释清楚
第一种方法: 在第一种方法中,您正在为所有 6 个任务启动所有 Async
请求,然后在每个任务上调用 join
函数他们得到结果
第二种方法: 但是在第二种方法中,您在为每个任务旋转 Async
请求后立即调用 join
。例如,在为任务 1
调用 join
旋转 Async
线程后,确保该线程完成任务,然后仅使用 Async
线程[=21= 旋转第二个任务]
注意: 另一方面,如果您清楚地观察输出,在第一种方法中,输出以随机顺序出现,因为所有六个任务都是异步执行的。但是在第二种方法中,所有任务都是一个接一个地顺序执行的。
我相信你知道流 map
操作是如何执行的,或者你可以从
To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as filter(Predicate)), and a terminal operation (which produces a result or side-effect, such as count() or forEach(Consumer)). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.
流框架没有定义 map
操作在流元素上执行的顺序,因为它不适用于可能是相关问题的用例。因此,您的第二个版本执行的特定方式本质上等同于
List<Integer> results = new ArrayList<>();
for (Integer sleepTime : sleepTimes) {
results.add(CompletableFuture
.supplyAsync(() -> sleepTask(sleepTime), executorService2)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.join());
}
...本质上等同于
List<Integer> results = new ArrayList<>()
for (Integer sleepTime : sleepTimes) {
results.add(sleepTask(sleepTime));
}
@Deadpool 回答得很好,只是添加我的回答,可以帮助人们更好地理解它。
通过向这两种方法添加更多打印,我能够得到答案。
TLDR
2 流方法: 我们异步启动所有 6 个任务,然后在每个任务上调用 join 函数以在单独的流中获取结果.
1 流方法: 我们在启动每个任务后立即调用连接。例如,在为任务 1 旋转一个线程后,调用 join 确保线程等待任务 1 完成,然后仅使用异步线程启动第二个任务。
注意:另外,如果我们清楚地观察输出,在 1 stream 方法中,由于所有六个任务都是按顺序执行的,所以输出呈现顺序。但是在第二种方法中,所有任务都是并行执行的,因此顺序是随机的。
注释 2:如果我们在 1 流方法中将 stream()
替换为 parallelStream()
,它将与 2 流方法相同。
更多证据
我向流中添加了更多打印,从而产生了以下输出并确认了上面的注释:
1 个流:
List<Integer> results = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService2)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.map(f -> {
int num = f.join();
System.out.println(String.format("doing join on task %d", num));
return num;
})
.collect(Collectors.toList());
WITH SAME STREAM FOR FUTURE AND JOIN
Task with sleep time 1
doing join on task 1
Task with sleep time 2
doing join on task 2
Task with sleep time 3
doing join on task 3
Task with sleep time 4
doing join on task 4
Task with sleep time 5
doing join on task 5
Task with sleep time 6
doing join on task 6
done in 21 seconds.
[1, 2, 3, 4, 5, 6]
2 个流:
List<CompletableFuture<Integer>> futures = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.collect(Collectors.toList());
List<Integer> result = futures.stream()
.map(f -> {
int num = f.join();
System.out.println(String.format("doing join on task %d", num));
return num;
})
.collect(Collectors.toList());
WITH SEPARATE STREAMS FOR FUTURE AND JOIN
Task with sleep time 2
Task with sleep time 5
Task with sleep time 3
Task with sleep time 1
Task with sleep time 4
Task with sleep time 6
doing join on task 1
doing join on task 2
doing join on task 3
doing join on task 4
doing join on task 5
doing join on task 6
done in 6 seconds.
[1, 2, 3, 4, 5, 6]