递归取消 allOf CompletableFuture

Recursively cancel an allOf CompletableFuture

如果我有

CompletableFuture<Something> future1 = service.request(param1);
CompletableFuture<Something> future2 = service.request(param2);
CompletableFuture<Void> many = CompletableFuture.allOf(future1, future2);

当我这样做时会发生什么 many.cancel()future1future2 也会取消吗?如果没有,实现这一目标的最干净的方法是什么?我舍不得future1future2,只是为了能在我想取消的时候取消它们many

关于我为什么想要这个的一些背景:当接收到一段数据时,我需要请求匹配,可能是未来的数据来执行计算。如果有更新的数据到来,我想取消之前计算的完成,因为结果会立即被新的计算所取代。

CompletableFuture.allOf 构建的树不包含对 CompletableFuture 的给定实例的任何引用。相反,如果只是构建 completion 树,当所有给定的 CompletableFutures 完成时 完成 (来自 JavaDocs)。

所以您可能必须保留对所有 CompletableFuture 的引用,以便在需要时按顺序取消它们。

在让自己的生活变得比必要的更艰难之前,您应该了解取消 CompletableFuture 的实际作用。最重要的是,它不会停止相关计算。

如果与 CompletableFuture 关联的计算已经 运行,但尚未完成,取消 CompletableFuture 会将其变为“已取消”状态,这可能有一个对所有相关阶段立即产生影响,但不会对计算产生影响,计算将一直持续到完成,尽管它试图完成已取消的未来不会产生任何影响。

虽然其他 Future 可能会因中断而被取消,这将停止计算,如果它检查中断,这不适用于 CompletableFuture,请参阅 CompletableFuture.cancel(boolean) :

Parameters:

mayInterruptIfRunning - this value has no effect in this implementation because interrupts are not used to control processing.

因此,当您成功取消 future1future2 时,唯一的即时效果是取消 many,您也可以通过调用 [=21] 来实现=] 在 many 本身。如果有更多的依赖阶段,它会产生更广泛的影响,但既然你说过,你不想保留对 future1future2 的引用,情况似乎并非如此.

以下代码演示了该行为:

CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
    System.out.println("supplying value");
    return "foo";
});
CompletableFuture<String> then = supply.thenApply(s -> {
    System.out.println("Evaluating next stage");
    return s;
});
CompletableFuture<?> last = then.handle((s,t) -> {
    System.out.println("last stage: value: "+s+", throwable: "+t);
    return "";
});
System.out.println("cancelling: "+supply.cancel(true));
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

此代码可重现打印:

last stage: value: null, throwable: java.util.concurrent.CompletionException: java.util.concurrent.CancellationException
canceling: true
supplying value

(顺序可能会改变)

不管你是调用supply.cancel(true)还是then.cancel(true)或者是传递true还是false;它不会停止正在进行的 Supplier 评估。

如果关联的计算还没有开始,它会在开始时检查取消状态,就像 CompletableFuture 中的便捷方法产生的动作一样,会有区别。这种情况很少见,通常情况下,您的 service.request(paramN) 调用应该会触发评估。

它是 CompletableFuture 的基础 属性,顾名思义,它是 可完成的 ,即任何人都可以调用 complete因此,CompletableFuture 无法控制将来可能最终调用 complete 的人。因此,cancel 所能实现的就是将其设置为已取消状态,这意味着忽略后续完成尝试并将取消向下传播到相关操作。


所以最重要的是,您可能已经可以在 many 实例上调用 cancel,因为在 future1 和 [=19 上调用 cancel =] 不太可能产生值得您的代码复杂化的效果。

你可以试试我的图书馆:https://github.com/vsilaev/tascalate-concurrent

它提供了真正可取消的 CompletionStage 实现 (CompletableTask) 以及组合它们的方法 (Promises.all)

CompletionStage<Something> future1 = CompletableTask
  .complete(param1, myExecutor).thenApplyAsync(this::serviceCall);
CompletionStage<Something> future2 = CompletableTask
  .complete(param2, myExecutor).thenApplyAsync(this::serviceCall);

Promise<List<Something>> many = Promises.all(future1, future2);

现在您可以调用 many.cancel(true) 并且 future1future2 都将被取消(如果尚未完成)。此外,如果其中一个期货异常完成,则另一个期货将自动取消(同样,如果尚未完成)。