为什么主线程不等待其他异步进程(线程)完成。 allOff 无法正常工作
Why main thread is not waiting for other async process(threads) to complete. allOff not working properly
我正在 for 循环中调用一个异步方法并将其未来添加到列表中。我不确定为什么 allOff 不等待完成所有期货并返回部分结果。看看我的代码。
我有一个重写的方法
@Overide
@Async
CompletableFuture<someType> fetchData()
{
returns new CompletableFuture<someType>();
}
我在不同实例的 for 循环中调用上述方法。
获取实现一个接口的所有 bean,该接口具有方法 fetchData。
Map<String, SomeClass> allBeans =context.getBeansOfType(SomeClass.class);
List<SomeClass> list=
allBeans.values().stream().collect(SomeClass.toList());
for (SomeClass classInstance: list) {
classInstance.fetchData().thenApply(x->{
//Some DB persistence Call
futureList.add(x);
});
}
之后我应用 allOff 以便所有未来都可以完成,但它不会等待所有和主线程执行剩余的流程。
CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(
futureList.toArray(new CompletableFuture[futureList.size()]));
CompletableFuture<List<futureResponse>> finalList=
combinedFutures.thenApply(v -> {
return futureList.stream().map(m ->
m.join()).collect(Collectors.toList());
});
finalList- 在此列表中,我想要 fetch 返回的所有已完成的期货
调用。
在 finalList 中,我总是得到 2 个对象,但 fetchData 得到 运行 5 次(基于实例数),我在所有剩余的异步调用完成后看到了日志。有人可以帮忙吗。
观察:- 让主线程休眠 30 秒后,我可以看到列表中有所有 5 个对象。有人能告诉我为什么主线程根本不等待所有 futures 完成吗?
您有竞争条件:
classInstance.fetchData().thenApply(x->{
futureList.add(x);
});
这段代码的意思是,只有当未来的x
完成时,才会将x
添加到futureList
。这可能需要 10 毫秒或 2 小时,谁知道呢? (如果未来异常失败,它可能永远不会)。
所以,当代码到达
CompletableFuture.allOf(futureList....
没有 "guarantee" 调用了 thenApply
。 futureList
甚至可以为空。
您可以更正此代码的一种方法如下:
Map<String, SomeClass> allBeans =context.getBeansOfType(SomeClass.class);
List<SomeClass> list=
allBeans.values().stream().collect(SomeClass.toList());
for (SomeClass classInstance: list) {
futureList.add(classInstance.fetchData());
}
或者如果您确实需要在 thenApply
中做某事:
for (SomeClass classInstance: list) {
futureList.add(
classInstance.fetchData().thenApply(x -> whatever(x))
);
}
这样,您的 futureList
不会在异步结果 returns 时被填充(这是未知的,甚至可能因从未发生的异常而失败),而是在异步调用时立即填充已创建,这就是您真正想要的。
IIUC,你想做的可以做的更简单
CompletableFuture<List<FutureResponse>> = CompletableFuture.supplyAsync(() -> {
// start fetches and collect the individual futures
List<CompletableFuture> fetches =
allBeans.values().stream()
.map(SomeClass::fetchData)
.collect(toList());
// join all futures and return the list of the results
return fetches.stream()
.map(CompletableFuture::join)
.collect(toList());
}
我认为您不能在单个流中执行此操作(即映射到 fetch
,然后立即映射到 join
),因为这可能会在创建下一个未来之前等待连接。
我正在 for 循环中调用一个异步方法并将其未来添加到列表中。我不确定为什么 allOff 不等待完成所有期货并返回部分结果。看看我的代码。
我有一个重写的方法
@Overide
@Async
CompletableFuture<someType> fetchData()
{
returns new CompletableFuture<someType>();
}
我在不同实例的 for 循环中调用上述方法。
获取实现一个接口的所有 bean,该接口具有方法 fetchData。
Map<String, SomeClass> allBeans =context.getBeansOfType(SomeClass.class);
List<SomeClass> list=
allBeans.values().stream().collect(SomeClass.toList());
for (SomeClass classInstance: list) {
classInstance.fetchData().thenApply(x->{
//Some DB persistence Call
futureList.add(x);
});
}
之后我应用 allOff 以便所有未来都可以完成,但它不会等待所有和主线程执行剩余的流程。
CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(
futureList.toArray(new CompletableFuture[futureList.size()]));
CompletableFuture<List<futureResponse>> finalList=
combinedFutures.thenApply(v -> {
return futureList.stream().map(m ->
m.join()).collect(Collectors.toList());
});
finalList- 在此列表中,我想要 fetch 返回的所有已完成的期货 调用。
在 finalList 中,我总是得到 2 个对象,但 fetchData 得到 运行 5 次(基于实例数),我在所有剩余的异步调用完成后看到了日志。有人可以帮忙吗。
观察:- 让主线程休眠 30 秒后,我可以看到列表中有所有 5 个对象。有人能告诉我为什么主线程根本不等待所有 futures 完成吗?
您有竞争条件:
classInstance.fetchData().thenApply(x->{
futureList.add(x);
});
这段代码的意思是,只有当未来的x
完成时,才会将x
添加到futureList
。这可能需要 10 毫秒或 2 小时,谁知道呢? (如果未来异常失败,它可能永远不会)。
所以,当代码到达
CompletableFuture.allOf(futureList....
没有 "guarantee" 调用了 thenApply
。 futureList
甚至可以为空。
您可以更正此代码的一种方法如下:
Map<String, SomeClass> allBeans =context.getBeansOfType(SomeClass.class);
List<SomeClass> list=
allBeans.values().stream().collect(SomeClass.toList());
for (SomeClass classInstance: list) {
futureList.add(classInstance.fetchData());
}
或者如果您确实需要在 thenApply
中做某事:
for (SomeClass classInstance: list) {
futureList.add(
classInstance.fetchData().thenApply(x -> whatever(x))
);
}
这样,您的 futureList
不会在异步结果 returns 时被填充(这是未知的,甚至可能因从未发生的异常而失败),而是在异步调用时立即填充已创建,这就是您真正想要的。
IIUC,你想做的可以做的更简单
CompletableFuture<List<FutureResponse>> = CompletableFuture.supplyAsync(() -> {
// start fetches and collect the individual futures
List<CompletableFuture> fetches =
allBeans.values().stream()
.map(SomeClass::fetchData)
.collect(toList());
// join all futures and return the list of the results
return fetches.stream()
.map(CompletableFuture::join)
.collect(toList());
}
我认为您不能在单个流中执行此操作(即映射到 fetch
,然后立即映射到 join
),因为这可能会在创建下一个未来之前等待连接。