CompletableFuture join() 调用挂起主线程。个人期货永远不会完成
CompletableFuture join() call hangs main thread. Individual futures never get completed
我正在编写一个创建多个 (7) CompletableFutures 的函数。这些期货中的每一个基本上都做了两件事:
- 使用 supplyAsync(),从一些数据库中获取数据
- 使用 thenAccept(),将此数据写入 CSV 文件
当7个futures都完成工作后,我想继续进一步的代码执行。所以,我使用 allOf() 然后在 Void CompletableFuture returned 上调用 join() 由 allOf()。
问题是,即使所有 futures 都已执行(我可以看到生成了 CSV),join() 调用仍然卡住,进一步的代码执行永远被阻止。
我尝试了以下方法:
在每个未来之后一个接一个地等待每个未来调用 join() 。这是有效的,但是以并发为代价。我不想这样做。
尝试使用带超时的 get() 而不是 join()。但是,这总是会引发异常(因为 get 总是超时),这是不可取的。
看到这个 JDK 错误:https://bugs.openjdk.java.net/browse/JDK-8200347。不确定这是否是一个类似的问题。
在没有 join() 或 get() 的情况下尝试 运行,这将不会保持线程执行并且再次不可取。
创建所有期货的主要功能。
public CustomResponse process() {
CustomResponse msgResponse = new CustomResponse();
try {
// 1. DbCall 1
CompletableFuture<Void> f1 = dataHelper.fetchAndUploadCSV1();
// 2. DbCall 2
CompletableFuture<Void> f2 = dataHelper.fetchAndUploadCSV2();
// 3. DbCall 3
CompletableFuture<Void> f3 = dataHelper.fetchAndUploadCSV3();
// 4. DbCall 4
CompletableFuture<Void> f4 = dataHelper.fetchAndUploadCSV4();
// 5. DbCall 5
CompletableFuture<Void> f5 = dataHelper.fetchAndUploadCSV5();
// 6. DbCall 6
CompletableFuture<Void> f6 = dataHelper.fetchAndUploadCSV6();
// 7. DbCall 7
CompletableFuture<Void> f7 = dataHelper.fetchAndUploadCSV7();
CompletableFuture<Void>[] fAll = new CompletableFuture[] {f1, f2, f3, f4, f5, f6, f7};
CompletableFuture.allOf(fAll).join();
msgResponse.setProcessed(true);
msgResponse.setMessageStatus("message");
} catch (Exception e) {
msgResponse.setMessageStatus(ERROR);
msgResponse.setErrorMessage("error");
}
return msgResponse;
}
每个 fetchAndUploadCSV() 函数如下所示:
public CompletableFuture<Void> fetchAndUploadCSV1() {
return CompletableFuture.supplyAsync(() -> {
try {
return someService().getAllData1();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).thenAccept(results -> {
try {
if (results.size() > 0) {
csvWriter.uploadAsCsv(results);
}
else {
log.info(" No data found..");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
这就是 csvWriter.uploadAsCsv(results)
的样子 -
public <T> void uploadAsCsv(List<T> objectList) throws Exception {
long objListSize = ((objectList==null) ? 0 : objectList.size());
log.info("Action=Start, objectListSize=" + objListSize);
ByteArrayInputStream inputStream = getCsvAsInputStream(objectList);
Info fileInfo = someClient.uploadFile(inputStream);
log.info("Action=Done, FileInfo=" + ((fileInfo==null ? null : fileInfo.getID())));
}
我在这里使用 OpenCSV 将数据转换为 CSV 流。而且我总能看到最后一行日志。
预期结果:
获取的所有数据、生成的 CSV 和 CustomResponse 应该 return 没有错误消息的处理。
实际结果:
获取所有数据,生成 CSV 并挂起主线程。
您可以在每个创建的 CompletableFuture
上使用 join
而不会牺牲并发性:
public CustomResponse process() {
CustomResponse msgResponse = new CustomResponse();
List<CompletableFuture<Void>> futures = Arrays.asList(dataHelper.fetchAndUploadCSV1(),
dataHelper.fetchAndUploadCSV2(),
dataHelper.fetchAndUploadCSV3(),
dataHelper.fetchAndUploadCSV4(),
dataHelper.fetchAndUploadCSV5(),
dataHelper.fetchAndUploadCSV6(),
dataHelper.fetchAndUploadCSV7());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> {
msgResponse.setProcessed(true);
msgResponse.setMessageStatus("message");
return msgResponse;
})
.exceptionally(throwable -> {
msgResponse.setMessageStatus("ERROR");
msgResponse.setErrorMessage("error");
return msgResponse;
}).join();
}
allOf
returns 一个新的 CompletableFuture
当所有给定的 CompletableFutures 完成时完成。因此,当在 thenApply
中调用 join
时,它会立即 returns。本质上,加入是发生在已经完成的期货上。这样就消除了阻塞。此外,为了处理可能的异常,应该调用 exceptionally
。
我正在编写一个创建多个 (7) CompletableFutures 的函数。这些期货中的每一个基本上都做了两件事:
- 使用 supplyAsync(),从一些数据库中获取数据
- 使用 thenAccept(),将此数据写入 CSV 文件
当7个futures都完成工作后,我想继续进一步的代码执行。所以,我使用 allOf() 然后在 Void CompletableFuture returned 上调用 join() 由 allOf()。
问题是,即使所有 futures 都已执行(我可以看到生成了 CSV),join() 调用仍然卡住,进一步的代码执行永远被阻止。
我尝试了以下方法:
在每个未来之后一个接一个地等待每个未来调用 join() 。这是有效的,但是以并发为代价。我不想这样做。
尝试使用带超时的 get() 而不是 join()。但是,这总是会引发异常(因为 get 总是超时),这是不可取的。
看到这个 JDK 错误:https://bugs.openjdk.java.net/browse/JDK-8200347。不确定这是否是一个类似的问题。
在没有 join() 或 get() 的情况下尝试 运行,这将不会保持线程执行并且再次不可取。
创建所有期货的主要功能。
public CustomResponse process() {
CustomResponse msgResponse = new CustomResponse();
try {
// 1. DbCall 1
CompletableFuture<Void> f1 = dataHelper.fetchAndUploadCSV1();
// 2. DbCall 2
CompletableFuture<Void> f2 = dataHelper.fetchAndUploadCSV2();
// 3. DbCall 3
CompletableFuture<Void> f3 = dataHelper.fetchAndUploadCSV3();
// 4. DbCall 4
CompletableFuture<Void> f4 = dataHelper.fetchAndUploadCSV4();
// 5. DbCall 5
CompletableFuture<Void> f5 = dataHelper.fetchAndUploadCSV5();
// 6. DbCall 6
CompletableFuture<Void> f6 = dataHelper.fetchAndUploadCSV6();
// 7. DbCall 7
CompletableFuture<Void> f7 = dataHelper.fetchAndUploadCSV7();
CompletableFuture<Void>[] fAll = new CompletableFuture[] {f1, f2, f3, f4, f5, f6, f7};
CompletableFuture.allOf(fAll).join();
msgResponse.setProcessed(true);
msgResponse.setMessageStatus("message");
} catch (Exception e) {
msgResponse.setMessageStatus(ERROR);
msgResponse.setErrorMessage("error");
}
return msgResponse;
}
每个 fetchAndUploadCSV() 函数如下所示:
public CompletableFuture<Void> fetchAndUploadCSV1() {
return CompletableFuture.supplyAsync(() -> {
try {
return someService().getAllData1();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).thenAccept(results -> {
try {
if (results.size() > 0) {
csvWriter.uploadAsCsv(results);
}
else {
log.info(" No data found..");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
这就是 csvWriter.uploadAsCsv(results)
的样子 -
public <T> void uploadAsCsv(List<T> objectList) throws Exception {
long objListSize = ((objectList==null) ? 0 : objectList.size());
log.info("Action=Start, objectListSize=" + objListSize);
ByteArrayInputStream inputStream = getCsvAsInputStream(objectList);
Info fileInfo = someClient.uploadFile(inputStream);
log.info("Action=Done, FileInfo=" + ((fileInfo==null ? null : fileInfo.getID())));
}
我在这里使用 OpenCSV 将数据转换为 CSV 流。而且我总能看到最后一行日志。
预期结果: 获取的所有数据、生成的 CSV 和 CustomResponse 应该 return 没有错误消息的处理。
实际结果: 获取所有数据,生成 CSV 并挂起主线程。
您可以在每个创建的 CompletableFuture
上使用 join
而不会牺牲并发性:
public CustomResponse process() {
CustomResponse msgResponse = new CustomResponse();
List<CompletableFuture<Void>> futures = Arrays.asList(dataHelper.fetchAndUploadCSV1(),
dataHelper.fetchAndUploadCSV2(),
dataHelper.fetchAndUploadCSV3(),
dataHelper.fetchAndUploadCSV4(),
dataHelper.fetchAndUploadCSV5(),
dataHelper.fetchAndUploadCSV6(),
dataHelper.fetchAndUploadCSV7());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> {
msgResponse.setProcessed(true);
msgResponse.setMessageStatus("message");
return msgResponse;
})
.exceptionally(throwable -> {
msgResponse.setMessageStatus("ERROR");
msgResponse.setErrorMessage("error");
return msgResponse;
}).join();
}
allOf
returns 一个新的 CompletableFuture
当所有给定的 CompletableFutures 完成时完成。因此,当在 thenApply
中调用 join
时,它会立即 returns。本质上,加入是发生在已经完成的期货上。这样就消除了阻塞。此外,为了处理可能的异常,应该调用 exceptionally
。