关闭 CompletableFuture 链中的外部进程
Closing external process in CompletableFuture chain
我正在寻找 "close" 一些资源的更好方法,这里销毁 CompletableFuture
链中的外部 Process
。现在我的代码大致如下所示:
public CompletableFuture<ExecutionContext> createFuture()
{
final Process[] processHolder = new Process[1];
return CompletableFuture.supplyAsync(
() -> {
try {
processHolder[0] = new ProcessBuilder(COMMAND)
.redirectErrorStream(true)
.start();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return PARSER.parse(processHolder[0].getInputStream());
}, SCHEDULER)
.applyToEither(createTimeoutFuture(DURATION), Function.identity())
.exceptionally(throwable -> {
processHolder[0].destroyForcibly();
if (throwable instanceof TimeoutException) {
throw new DatasourceTimeoutException(throwable);
}
Throwables.propagateIfInstanceOf(throwable, DatasourceException.class);
throw new DatasourceException(throwable);
});
}
我看到的问题是一个 "hacky" 单元素数组,它包含对流程的引用,以便在出现错误时可以将其关闭。是否有一些 CompletableFuture
API 允许将一些 "context" 传递给 exceptionally
(或其他一些方法来实现)?
我正在考虑自定义 CompletionStage
实现,但摆脱 "holder" 变量似乎是一项艰巨的任务。
不需要 CompletableFuture
的线性链。实际上,你已经没有了,因为 createTimeoutFuture(DURATION)
实现超时非常复杂。你可以简单地这样说:
public CompletableFuture<ExecutionContext> createFuture() {
CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
() -> {
try {
return new ProcessBuilder(COMMAND).redirectErrorStream(true).start();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, SCHEDULER);
CompletableFuture<ExecutionContext> result
=proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER);
proc.thenAcceptAsync(process -> {
if(!process.waitFor(DURATION, TimeUnit.WHATEVER_DURATION_REFERS_TO)) {
process.destroyForcibly();
result.completeExceptionally(
new DatasourceTimeoutException(new TimeoutException()));
}
});
return result;
}
如果你想保留超时时间,也许你认为进程启动时间很长,你可以使用
public CompletableFuture<ExecutionContext> createFuture() {
CompletableFuture<Throwable> timeout=createTimeoutFuture(DURATION);
CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
() -> {
try {
return new ProcessBuilder(COMMAND).redirectErrorStream(true).start();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, SCHEDULER);
CompletableFuture<ExecutionContext> result
=proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER);
timeout.exceptionally(t -> new DatasourceTimeoutException(t))
.thenAcceptBoth(proc, (x, process) -> {
if(process.isAlive()) {
process.destroyForcibly();
result.completeExceptionally(x);
}
});
return result;
}
我自己使用了一个项目数组来模拟 Java 中的正确闭包。
另一种选择是使用带有字段的私有静态 class。优点是它使目的更清晰并且对具有大闭包的垃圾收集器的影响较小,即具有 N 个字段的对象与 N 个长度为 1 的数组。如果您需要关闭相同的字段,它也会变得有用在其他方法中。
这是一个 事实上的 模式,甚至在 CompletableFuture
的范围之外,并且在 lambdas 成为 [=26] 之前很久就被(滥用)使用了=],例如匿名 classes。所以,不要难过,只是 Java 的进化并没有为我们提供适当的关闭(现在?曾经?)。
如果需要,您可以 return 来自 CompletableFuture
的值在 .handle()
内,这样您就可以完整地包装完成结果和 return 包装器。在我看来,这并不比手动关闭好多少,添加了一个事实,即您将在每个未来创建这样的包装器。
Subclassing CompletableFuture
没有必要。您对改变它的行为不感兴趣,只对将数据附加到它感兴趣,这可以通过当前 Java 的最终变量捕获来实现。也就是说,除非您分析并发现创建这些闭包实际上会以某种方式影响性能,我对此深表怀疑。
我正在寻找 "close" 一些资源的更好方法,这里销毁 CompletableFuture
链中的外部 Process
。现在我的代码大致如下所示:
public CompletableFuture<ExecutionContext> createFuture()
{
final Process[] processHolder = new Process[1];
return CompletableFuture.supplyAsync(
() -> {
try {
processHolder[0] = new ProcessBuilder(COMMAND)
.redirectErrorStream(true)
.start();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return PARSER.parse(processHolder[0].getInputStream());
}, SCHEDULER)
.applyToEither(createTimeoutFuture(DURATION), Function.identity())
.exceptionally(throwable -> {
processHolder[0].destroyForcibly();
if (throwable instanceof TimeoutException) {
throw new DatasourceTimeoutException(throwable);
}
Throwables.propagateIfInstanceOf(throwable, DatasourceException.class);
throw new DatasourceException(throwable);
});
}
我看到的问题是一个 "hacky" 单元素数组,它包含对流程的引用,以便在出现错误时可以将其关闭。是否有一些 CompletableFuture
API 允许将一些 "context" 传递给 exceptionally
(或其他一些方法来实现)?
我正在考虑自定义 CompletionStage
实现,但摆脱 "holder" 变量似乎是一项艰巨的任务。
不需要 CompletableFuture
的线性链。实际上,你已经没有了,因为 createTimeoutFuture(DURATION)
实现超时非常复杂。你可以简单地这样说:
public CompletableFuture<ExecutionContext> createFuture() {
CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
() -> {
try {
return new ProcessBuilder(COMMAND).redirectErrorStream(true).start();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, SCHEDULER);
CompletableFuture<ExecutionContext> result
=proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER);
proc.thenAcceptAsync(process -> {
if(!process.waitFor(DURATION, TimeUnit.WHATEVER_DURATION_REFERS_TO)) {
process.destroyForcibly();
result.completeExceptionally(
new DatasourceTimeoutException(new TimeoutException()));
}
});
return result;
}
如果你想保留超时时间,也许你认为进程启动时间很长,你可以使用
public CompletableFuture<ExecutionContext> createFuture() {
CompletableFuture<Throwable> timeout=createTimeoutFuture(DURATION);
CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
() -> {
try {
return new ProcessBuilder(COMMAND).redirectErrorStream(true).start();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, SCHEDULER);
CompletableFuture<ExecutionContext> result
=proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER);
timeout.exceptionally(t -> new DatasourceTimeoutException(t))
.thenAcceptBoth(proc, (x, process) -> {
if(process.isAlive()) {
process.destroyForcibly();
result.completeExceptionally(x);
}
});
return result;
}
我自己使用了一个项目数组来模拟 Java 中的正确闭包。
另一种选择是使用带有字段的私有静态 class。优点是它使目的更清晰并且对具有大闭包的垃圾收集器的影响较小,即具有 N 个字段的对象与 N 个长度为 1 的数组。如果您需要关闭相同的字段,它也会变得有用在其他方法中。
这是一个 事实上的 模式,甚至在 CompletableFuture
的范围之外,并且在 lambdas 成为 [=26] 之前很久就被(滥用)使用了=],例如匿名 classes。所以,不要难过,只是 Java 的进化并没有为我们提供适当的关闭(现在?曾经?)。
如果需要,您可以 return 来自 CompletableFuture
的值在 .handle()
内,这样您就可以完整地包装完成结果和 return 包装器。在我看来,这并不比手动关闭好多少,添加了一个事实,即您将在每个未来创建这样的包装器。
Subclassing CompletableFuture
没有必要。您对改变它的行为不感兴趣,只对将数据附加到它感兴趣,这可以通过当前 Java 的最终变量捕获来实现。也就是说,除非您分析并发现创建这些闭包实际上会以某种方式影响性能,我对此深表怀疑。