在程序退出前执行的非阻塞 CompleteableFuture
Non-Blocking CompleteableFuture that Executes before Program Exits
我正在执行一个应用程序(static void main(String[] args)
,它以特定的节奏向端点异步发出 POST
请求。由于请求的数量和漫长的 运行 时间应用程序,将期货保存在列表中然后事后处理它们是不可行的。
所以,我添加了一个 thenAcceptAsync
调用,它只是在响应进入时记录响应。但是,特别是对于较短的测试 运行s,程序将在一些响应之前完成并退出回复 return.
如何确保在程序完成之前记录所有响应?
public void replayScripts() {
final String applicationId = getSparkApplicationId();
LOGGER.debug("applicationId: {}", applicationId);
final long scriptsToSubmit = this.config.getMaxScripts() == null ? this.config.getScripts().size()
: this.config.getMaxScripts().intValue();
long maxWaitTimeMillis = 0L;
long actualWaitTimeMillis = 0L;
for (int i = 0; i < scriptsToSubmit; i++) {
LOGGER.info("submitting script #{} of {}", i + 1, scriptsToSubmit);
this.dao.submitScript(this.config.getScripts().get(i).getRawScript())
.thenAcceptAsync(this::logResponse);
if (i + 1 < this.config.getScripts().size()) {
final long millisWait = TimeUnit.MILLISECONDS.convert(
Duration.between(this.config.getScripts().get(i).getSubmissionTime(),
this.config.getScripts().get(i + 1).getSubmissionTime()).get(ChronoUnit.NANOS),
TimeUnit.NANOSECONDS);
maxWaitTimeMillis += millisWait;
actualWaitTimeMillis += sleep(millisWait, applicationId);
if (this.config.getClearCache()) {
this.dao.clearCache();
}
}
}
LOGGER.info("max wait time: {}s", maxWaitTimeMillis / 1000.0);
LOGGER.info("actual wait time: {}s", actualWaitTimeMillis / 1000.0);
}
编辑: 解决方案
使用@irahavoi 的建议,我暂时采用了计数器解决方案,添加了这个片段,它使用 AtomicInteger
class 成员来跟踪记录的响应数量:
while (this.config.isLogResponses() && this.loggedResponsesCounter.get() < scriptsToSubmit) {
try {
LOGGER.info("sleeping for one second to allow all responses to be logged");
Thread.sleep(1000);
} catch (final InterruptedException e) {
LOGGER.warn("An exception occurred while attempting to wait for all responses to be logged", e);
}
}
我看到有 2 个选项可以解决这个问题(都不是超级优雅):
分批处理您的请求(例如,每批是 1k 个请求)。并分别处理每批次。这样您就可以在没有 OOM 风险的情况下使用它:
CompletableFuture.allOf(aBatchOfLogPipelineResponses).join();
如上面的评论所述,您可以为收到的回复设置一个计数器(每次收到回复时,它都会递增)。并在您的主线程中定期检查它:如果计数器的值等于 scriptsToSubmit 那么就可以退出了。
另一种可能更清洁的解决方案是关闭您的 ExecutorService
并等待其终止。
我假设您已经在 DAO 中使用了自定义的 ExecutorService
,因为 CompletableFuture
默认使用的公共池不适合 IO 任务。否则,您应该查看 Executors
.
中的工厂方法
因此,一旦您提交了所有任务,您应该在主线程上调用 ExecutorService.shutdown()
, followed by awaitTermination()
。
这将等待所有任务(包括排队的任务)完成执行。
我正在执行一个应用程序(static void main(String[] args)
,它以特定的节奏向端点异步发出 POST
请求。由于请求的数量和漫长的 运行 时间应用程序,将期货保存在列表中然后事后处理它们是不可行的。
所以,我添加了一个 thenAcceptAsync
调用,它只是在响应进入时记录响应。但是,特别是对于较短的测试 运行s,程序将在一些响应之前完成并退出回复 return.
如何确保在程序完成之前记录所有响应?
public void replayScripts() {
final String applicationId = getSparkApplicationId();
LOGGER.debug("applicationId: {}", applicationId);
final long scriptsToSubmit = this.config.getMaxScripts() == null ? this.config.getScripts().size()
: this.config.getMaxScripts().intValue();
long maxWaitTimeMillis = 0L;
long actualWaitTimeMillis = 0L;
for (int i = 0; i < scriptsToSubmit; i++) {
LOGGER.info("submitting script #{} of {}", i + 1, scriptsToSubmit);
this.dao.submitScript(this.config.getScripts().get(i).getRawScript())
.thenAcceptAsync(this::logResponse);
if (i + 1 < this.config.getScripts().size()) {
final long millisWait = TimeUnit.MILLISECONDS.convert(
Duration.between(this.config.getScripts().get(i).getSubmissionTime(),
this.config.getScripts().get(i + 1).getSubmissionTime()).get(ChronoUnit.NANOS),
TimeUnit.NANOSECONDS);
maxWaitTimeMillis += millisWait;
actualWaitTimeMillis += sleep(millisWait, applicationId);
if (this.config.getClearCache()) {
this.dao.clearCache();
}
}
}
LOGGER.info("max wait time: {}s", maxWaitTimeMillis / 1000.0);
LOGGER.info("actual wait time: {}s", actualWaitTimeMillis / 1000.0);
}
编辑: 解决方案
使用@irahavoi 的建议,我暂时采用了计数器解决方案,添加了这个片段,它使用 AtomicInteger
class 成员来跟踪记录的响应数量:
while (this.config.isLogResponses() && this.loggedResponsesCounter.get() < scriptsToSubmit) {
try {
LOGGER.info("sleeping for one second to allow all responses to be logged");
Thread.sleep(1000);
} catch (final InterruptedException e) {
LOGGER.warn("An exception occurred while attempting to wait for all responses to be logged", e);
}
}
我看到有 2 个选项可以解决这个问题(都不是超级优雅):
分批处理您的请求(例如,每批是 1k 个请求)。并分别处理每批次。这样您就可以在没有 OOM 风险的情况下使用它:
CompletableFuture.allOf(aBatchOfLogPipelineResponses).join();
如上面的评论所述,您可以为收到的回复设置一个计数器(每次收到回复时,它都会递增)。并在您的主线程中定期检查它:如果计数器的值等于 scriptsToSubmit 那么就可以退出了。
另一种可能更清洁的解决方案是关闭您的 ExecutorService
并等待其终止。
我假设您已经在 DAO 中使用了自定义的 ExecutorService
,因为 CompletableFuture
默认使用的公共池不适合 IO 任务。否则,您应该查看 Executors
.
因此,一旦您提交了所有任务,您应该在主线程上调用 ExecutorService.shutdown()
, followed by awaitTermination()
。
这将等待所有任务(包括排队的任务)完成执行。