CompletableFutures 的 Future 上的 get 方法
get method on Future of CompletableFutures
我正在 java 中做一些多线程工作,我对 Future get
方法有疑问。
我有一个 ExecutorSevice
提交 n 个 Callable。提交 returns a Future
被添加到列表中(稍后在一些代码中)。 Callable 的 call
方法循环遍历列表,并且对于列表的每个项目 someMethod
和 returns 调用 CompletableFuture<T>
。然后,将 Completable 未来添加到地图中。
在循环结束时call
returns地图包含一堆CompletableFuture。
因此,初始列表 (uglyList
) 包含 <String, CompletableFuture<String>>
的 n 个映射。
我的问题是,当我在 uglyList
的元素上调用 get
时,执行将被阻止,直到访问整个列表(作为参数传递给 Callable)并且所有 CompletableFutures
已经插入地图了吧?
因为,我有一个疑问,有没有可能这个例子中的get
,也等待map中的CompletableFutures
完成?我不知道如何测试这个
public class A {
private class CallableC implements Callable<Map<String, CompletableFuture<String>>> {
private List<String> listString;
Map<String, CompletableFuture<String>> futureMap;
public CallableC(List<String> listString) throws Exception {
this.listString = listString;
this.futureMap = new HashMap<>();
}
@Override
public Map<String, CompletableFuture<String>> call() throws Exception {
for (int j = 0; j < listString.size(); ++j) {
CompletableFuture<String> future = someMethod();
futureMap.put(listString.get(i), future);
}
return futureMap;
}
}
public void multiThreadMethod(List<String> items) throws Exception {
List<Future<Map<String, CompletableFuture<String>>>> uglyList = new ArrayList<>();
int coresNumber = (Runtime.getRuntime().availableProcessors());
// This method split a List in n subList
List<List<String>> splittedList = split(items, coresNumber);
ExecutorService executor = Executors.newFixedThreadPool(coresNumber);
try {
for (int i = 0; i < coresNumber; ++i) {
threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
for (int i = 0; i < coresNumber; ++i) {
uglyList.get(i).get(); //here
}
} catch (Exception e) {
// YAY errors
}
finally {
executor.shutdown();
}
}
}
when i call get on a element of uglyList the execution is blocked until the entire list (passed as parameter to the Callable) is visited and all CompletableFutures have been inserted in the map, right?
是的,没错。 get()
将等待 Callable
完成,在您的情况下,这意味着 CallableC#call()
方法结束,这将在 for 循环结束时到达,当 CompletableFutures
全部创建并插入到 futureMap
中,而不管那些 CompletableFuture
的状态/完成。
it is possible that the get in this example, also waits the completion of the CompletableFutures in the map?
可以实现这一点,但需要额外的代码。
在您的代码中执行此操作的最小方法是在任务内的 CompletableFuture
上添加一个连接调用。
@Override
public Map<String, CompletableFuture<String>> call() throws Exception {
for (int j = 0; j < listString.size(); ++j) {
CompletableFuture<String> future = someMethod();
futureMap.put(listString.get(j), future);
}
// This will wait for all the futures inside the map to terminate
try {
CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).join();
} catch (CompletionException e) {
// ignored - the status will be checked later on
}
return futureMap;
}
另一种方法是在最后等待(为简洁起见,我跳过了异常处理):
for (int i = 0; i < coresNumber; ++i) {
threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
// This will wait for all the futures to terminate (thanks to f.get())
// And then, all the completable futures will be added to an array of futures that will also be waited upon
CompletableFuture.allOf(
uglyList.stream().flatMap(f -> f.get().values().stream()).toArray(CompletableFuture[]::new)
).join();
但如果可能的话,我会通过仅使用可完成的期货来简化整个代码,除非您有充分的理由按照您的方式中断任务(使用列表列表、专用执行器等) ...)。这可能看起来像这样(可能有一些错别字)
List<String> items = ...
ConcurrentMap<String, String> resultsByItem = new ConcurrentHashMap<>();
Future<Void> allWork = CompletableFuture.allOf(
items.stream()
.map(item -> someWork(item) // Get a completable future for this item
.andThen(result -> resultsByItem.put(item, result)) // Append the result to the map
).toArray(CompletableFuture[]::new)
);
allWork.join();
// Your futureMap is completed.
这将替换您在问题中的整个代码。
我正在 java 中做一些多线程工作,我对 Future get
方法有疑问。
我有一个 ExecutorSevice
提交 n 个 Callable。提交 returns a Future
被添加到列表中(稍后在一些代码中)。 Callable 的 call
方法循环遍历列表,并且对于列表的每个项目 someMethod
和 returns 调用 CompletableFuture<T>
。然后,将 Completable 未来添加到地图中。
在循环结束时call
returns地图包含一堆CompletableFuture。
因此,初始列表 (uglyList
) 包含 <String, CompletableFuture<String>>
的 n 个映射。
我的问题是,当我在 uglyList
的元素上调用 get
时,执行将被阻止,直到访问整个列表(作为参数传递给 Callable)并且所有 CompletableFutures
已经插入地图了吧?
因为,我有一个疑问,有没有可能这个例子中的get
,也等待map中的CompletableFutures
完成?我不知道如何测试这个
public class A {
private class CallableC implements Callable<Map<String, CompletableFuture<String>>> {
private List<String> listString;
Map<String, CompletableFuture<String>> futureMap;
public CallableC(List<String> listString) throws Exception {
this.listString = listString;
this.futureMap = new HashMap<>();
}
@Override
public Map<String, CompletableFuture<String>> call() throws Exception {
for (int j = 0; j < listString.size(); ++j) {
CompletableFuture<String> future = someMethod();
futureMap.put(listString.get(i), future);
}
return futureMap;
}
}
public void multiThreadMethod(List<String> items) throws Exception {
List<Future<Map<String, CompletableFuture<String>>>> uglyList = new ArrayList<>();
int coresNumber = (Runtime.getRuntime().availableProcessors());
// This method split a List in n subList
List<List<String>> splittedList = split(items, coresNumber);
ExecutorService executor = Executors.newFixedThreadPool(coresNumber);
try {
for (int i = 0; i < coresNumber; ++i) {
threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
for (int i = 0; i < coresNumber; ++i) {
uglyList.get(i).get(); //here
}
} catch (Exception e) {
// YAY errors
}
finally {
executor.shutdown();
}
}
}
when i call get on a element of uglyList the execution is blocked until the entire list (passed as parameter to the Callable) is visited and all CompletableFutures have been inserted in the map, right?
是的,没错。 get()
将等待 Callable
完成,在您的情况下,这意味着 CallableC#call()
方法结束,这将在 for 循环结束时到达,当 CompletableFutures
全部创建并插入到 futureMap
中,而不管那些 CompletableFuture
的状态/完成。
it is possible that the get in this example, also waits the completion of the CompletableFutures in the map?
可以实现这一点,但需要额外的代码。
在您的代码中执行此操作的最小方法是在任务内的 CompletableFuture
上添加一个连接调用。
@Override
public Map<String, CompletableFuture<String>> call() throws Exception {
for (int j = 0; j < listString.size(); ++j) {
CompletableFuture<String> future = someMethod();
futureMap.put(listString.get(j), future);
}
// This will wait for all the futures inside the map to terminate
try {
CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).join();
} catch (CompletionException e) {
// ignored - the status will be checked later on
}
return futureMap;
}
另一种方法是在最后等待(为简洁起见,我跳过了异常处理):
for (int i = 0; i < coresNumber; ++i) {
threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
// This will wait for all the futures to terminate (thanks to f.get())
// And then, all the completable futures will be added to an array of futures that will also be waited upon
CompletableFuture.allOf(
uglyList.stream().flatMap(f -> f.get().values().stream()).toArray(CompletableFuture[]::new)
).join();
但如果可能的话,我会通过仅使用可完成的期货来简化整个代码,除非您有充分的理由按照您的方式中断任务(使用列表列表、专用执行器等) ...)。这可能看起来像这样(可能有一些错别字)
List<String> items = ...
ConcurrentMap<String, String> resultsByItem = new ConcurrentHashMap<>();
Future<Void> allWork = CompletableFuture.allOf(
items.stream()
.map(item -> someWork(item) // Get a completable future for this item
.andThen(result -> resultsByItem.put(item, result)) // Append the result to the map
).toArray(CompletableFuture[]::new)
);
allWork.join();
// Your futureMap is completed.
这将替换您在问题中的整个代码。