SupplyAsync 等待所有 CompletableFutures 完成
SupplyAsync wait for all CompletableFutures to finish
我在下面 运行 一些异步任务,需要等到它们全部完成。我不确定为什么,但是 join()
不会强制等待所有任务,并且代码会继续执行而无需等待。连接流未按预期工作是否有原因?
CompletableFutures 列表只是一个映射 supplyAsync 的流
List<Integer> items = Arrays.asList(1, 2, 3);
List<CompletableFuture<Integer>> futures = items
.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
System.out.println("processing");
// do some processing here
return item;
}))
.collect(Collectors.toList());
我等着未来的样子。
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
我可以使用 futures.forEach(CompletableFuture::join);
等待,但我想知道为什么我的流方法不起作用。
此代码:
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
是否不等待futures
中的所有期货完成。它所做的是创建一个新的未来,它将等待 futures
中的所有异步执行在它自己完成之前完成(但在所有这些未来完成之前不会阻塞)。当这个 allOf 未来完成时,然后你的 thenApply
代码 运行s。但是 allOf()
会立即 return 而不会阻塞。
这意味着 futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
在您的代码中仅 运行s 在 所有异步执行完成后,这违背了您的目的。 join()
个调用将立即全部 return。但这不是更大的问题。您的挑战是 allOf().thenApply()
不会等待异步执行完成。它只会创造另一个不会阻塞的未来。
最简单的解决方案是使用第一个管道并映射到一个整数列表:
List<Integer> results = items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
System.out.println("processing " + item);
// do some processing here
return item;
}))
.collect(Collectors.toList()) //force-submit all
.stream()
.map(CompletableFuture::join) //wait for each
.collect(Collectors.toList());
如果您想使用与原始代码类似的内容,则必须将您的第二个代码段更改为:
List<Integer> reuslts = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
那是因为 CompletableFuture.allOf
不等待,它只是将所有 futures 组合成一个新的,当所有 futures 完成时完成:
Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.
或者,您仍然可以使用 allOf()
和 join()
,然后 运行 您当前的 thenApply()
代码:
//wrapper future completes when all futures have completed
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.join();
//join() calls below return immediately
List<Integer> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
join()
立即调用最后一条语句 return,因为 join()
对包装器 (allOf()
) 的调用 future 将等待传递给的所有 futures它来完成。这就是为什么当您可以使用第一种方法时我看不到这样做的原因。
我在下面 运行 一些异步任务,需要等到它们全部完成。我不确定为什么,但是 join()
不会强制等待所有任务,并且代码会继续执行而无需等待。连接流未按预期工作是否有原因?
CompletableFutures 列表只是一个映射 supplyAsync 的流
List<Integer> items = Arrays.asList(1, 2, 3);
List<CompletableFuture<Integer>> futures = items
.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
System.out.println("processing");
// do some processing here
return item;
}))
.collect(Collectors.toList());
我等着未来的样子。
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
我可以使用 futures.forEach(CompletableFuture::join);
等待,但我想知道为什么我的流方法不起作用。
此代码:
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
是否不等待futures
中的所有期货完成。它所做的是创建一个新的未来,它将等待 futures
中的所有异步执行在它自己完成之前完成(但在所有这些未来完成之前不会阻塞)。当这个 allOf 未来完成时,然后你的 thenApply
代码 运行s。但是 allOf()
会立即 return 而不会阻塞。
这意味着 futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
在您的代码中仅 运行s 在 所有异步执行完成后,这违背了您的目的。 join()
个调用将立即全部 return。但这不是更大的问题。您的挑战是 allOf().thenApply()
不会等待异步执行完成。它只会创造另一个不会阻塞的未来。
最简单的解决方案是使用第一个管道并映射到一个整数列表:
List<Integer> results = items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
System.out.println("processing " + item);
// do some processing here
return item;
}))
.collect(Collectors.toList()) //force-submit all
.stream()
.map(CompletableFuture::join) //wait for each
.collect(Collectors.toList());
如果您想使用与原始代码类似的内容,则必须将您的第二个代码段更改为:
List<Integer> reuslts = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
那是因为 CompletableFuture.allOf
不等待,它只是将所有 futures 组合成一个新的,当所有 futures 完成时完成:
Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.
或者,您仍然可以使用 allOf()
和 join()
,然后 运行 您当前的 thenApply()
代码:
//wrapper future completes when all futures have completed
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.join();
//join() calls below return immediately
List<Integer> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
join()
立即调用最后一条语句 return,因为 join()
对包装器 (allOf()
) 的调用 future 将等待传递给的所有 futures它来完成。这就是为什么当您可以使用第一种方法时我看不到这样做的原因。