为什么主线程不等待其他异步进程(线程)完成。 allOff 无法正常工作

Why main thread is not waiting for other async process(threads) to complete. allOff not working properly

我正在 for 循环中调用一个异步方法并将其未来添加到列表中。我不确定为什么 allOff 不等待完成所有期货并返回部分结果。看看我的代码。

我有一个重写的方法

@Overide
@Async
CompletableFuture<someType> fetchData()
{
returns new CompletableFuture<someType>();
}

我在不同实例的 for 循环中调用上述方法。

获取实现一个接口的所有 bean,该接口具有方法 fetchData。

Map<String, SomeClass> allBeans =context.getBeansOfType(SomeClass.class);

List<SomeClass> list=
    allBeans.values().stream().collect(SomeClass.toList());

for (SomeClass classInstance: list) {
  classInstance.fetchData().thenApply(x->{
    //Some DB persistence Call
    futureList.add(x);
  });
}

之后我应用 allOff 以便所有未来都可以完成,但它不会等待所有和主线程执行剩余的流程。

CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(
      futureList.toArray(new CompletableFuture[futureList.size()]));

  CompletableFuture<List<futureResponse>> finalList=
      combinedFutures.thenApply(v -> {
        return futureList.stream().map(m -> 
        m.join()).collect(Collectors.toList());
       });

finalList- 在此列表中,我想要 fetch 返回的所有已完成的期货 调用。

在 finalList 中,我总是得到 2 个对象,但 fetchData 得到 运行 5 次(基于实例数),我在所有剩余的异步调用完成后看到了日志。有人可以帮忙吗。

观察:- 让主线程休眠 30 秒后,我可以看到列表中有所有 5 个对象。有人能告诉我为什么主线程根本不等待所有 futures 完成吗?

您有竞争条件:

classInstance.fetchData().thenApply(x->{
    futureList.add(x);
});

这段代码的意思是,只有当未来的x完成时,才会将x添加到futureList。这可能需要 10 毫秒或 2 小时,谁知道呢? (如果未来异常失败,它可能永远不会)。

所以,当代码到达

CompletableFuture.allOf(futureList....

没有 "guarantee" 调用了 thenApplyfutureList 甚至可以为空。

您可以更正此代码的一种方法如下:

Map<String, SomeClass> allBeans =context.getBeansOfType(SomeClass.class);


List<SomeClass> list=
    allBeans.values().stream().collect(SomeClass.toList());

 for (SomeClass classInstance: list) {
    futureList.add(classInstance.fetchData());
 }

或者如果您确实需要在 thenApply 中做某事:

 for (SomeClass classInstance: list) {
    futureList.add(
        classInstance.fetchData().thenApply(x -> whatever(x))
    );
 }

这样,您的 futureList 不会在异步结果 returns 时被填充(这是未知的,甚至可能因从未发生的异常而失败),而是在异步调用时立即填充已创建,这就是您真正想要的。

IIUC,你想做的可以做的更简单

CompletableFuture<List<FutureResponse>> = CompletableFuture.supplyAsync(() -> {
    // start fetches and collect the individual futures
    List<CompletableFuture> fetches = 
       allBeans.values().stream()
               .map(SomeClass::fetchData)
               .collect(toList());
    // join all futures and return the list of the results
    return fetches.stream()
               .map(CompletableFuture::join)
               .collect(toList());
}

我认为您不能在单个流中执行此操作(即映射到 fetch,然后立即映射到 join),因为这可能会在创建下一个未来之前等待连接。