关闭 CompletableFuture 链中的外部进程

Closing external process in CompletableFuture chain

我正在寻找 "close" 一些资源的更好方法,这里销毁 CompletableFuture 链中的外部 Process。现在我的代码大致如下所示:

public CompletableFuture<ExecutionContext> createFuture()
{
    final Process[] processHolder = new Process[1];
    return CompletableFuture.supplyAsync(
            () -> {
                try {
                    processHolder[0] = new ProcessBuilder(COMMAND)
                            .redirectErrorStream(true)
                            .start();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
                return PARSER.parse(processHolder[0].getInputStream());
            }, SCHEDULER)
            .applyToEither(createTimeoutFuture(DURATION), Function.identity())
            .exceptionally(throwable -> {
                processHolder[0].destroyForcibly();
                if (throwable instanceof TimeoutException) {
                    throw new DatasourceTimeoutException(throwable);
                }
                Throwables.propagateIfInstanceOf(throwable, DatasourceException.class);
                throw new DatasourceException(throwable);
            });
}

我看到的问题是一个 "hacky" 单元素数组,它包含对流程的引用,以便在出现错误时可以将其关闭。是否有一些 CompletableFuture API 允许将一些 "context" 传递给 exceptionally (或其他一些方法来实现)?

我正在考虑自定义 CompletionStage 实现,但摆脱 "holder" 变量似乎是一项艰巨的任务。

不需要 CompletableFuture 的线性链。实际上,你已经没有了,因为 createTimeoutFuture(DURATION) 实现超时非常复杂。你可以简单地这样说:

public CompletableFuture<ExecutionContext> createFuture() {
    CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
        () -> {
            try {
                return new ProcessBuilder(COMMAND).redirectErrorStream(true).start();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }, SCHEDULER);
    CompletableFuture<ExecutionContext> result
        =proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER);
    proc.thenAcceptAsync(process -> {
        if(!process.waitFor(DURATION, TimeUnit.WHATEVER_DURATION_REFERS_TO)) {
            process.destroyForcibly();
            result.completeExceptionally(
                new DatasourceTimeoutException(new TimeoutException()));
        }
    });
    return result;
}

如果你想保留超时时间,也许你认为进程启动时间很长,你可以使用

public CompletableFuture<ExecutionContext> createFuture() {
    CompletableFuture<Throwable> timeout=createTimeoutFuture(DURATION);
    CompletableFuture<Process> proc=CompletableFuture.supplyAsync(
        () -> {
            try {
                return new ProcessBuilder(COMMAND).redirectErrorStream(true).start();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }, SCHEDULER);
    CompletableFuture<ExecutionContext> result
        =proc.thenApplyAsync(process -> PARSER.parse(process.getInputStream()), SCHEDULER);
    timeout.exceptionally(t -> new DatasourceTimeoutException(t))
           .thenAcceptBoth(proc, (x, process) -> {
                if(process.isAlive()) {
                    process.destroyForcibly();
                    result.completeExceptionally(x);
                }
            });
    return result;
}

我自己使用了一个项目数组来模拟 Java 中的正确闭包。

另一种选择是使用带有字段的私有静态 class。优点是它使目的更清晰并且对具有大闭包的垃圾收集器的影响较小,即具有 N 个字段的对象与 N 个长度为 1 的数组。如果您需要关闭相同的字段,它也会变得有用在其他方法中。

这是一个 事实上的 模式,甚至在 CompletableFuture 的范围之外,并且在 lambdas 成为 [=26] 之前很久就被(滥用)使用了=],例如匿名 classes。所以,不要难过,只是 Java 的进化并没有为我们提供适当的关闭(现在?曾经?)。

如果需要,您可以 return 来自 CompletableFuture 的值在 .handle() 内,这样您就可以完整地包装完成结果和 return 包装器。在我看来,这并不比手动关闭好多少,添加了一个事实,即您将在每个未来创建这样的包装器。

Subclassing CompletableFuture 没有必要。您对改变它的行为不感兴趣,只对将数据附加到它感兴趣,这可以通过当前 Java 的最终变量捕获来实现。也就是说,除非您分析并发现创建这些闭包实际上会以某种方式影响性能,我对此深表怀疑。