ExecutorService控制的CompletableFuture的注销

Cancellation of CompletableFuture controlled by ExecutorService

我有一个 ExecutorService 将计算的数据转发到 CompletableFuture:

class DataRetriever {
    private final ExecutorService service = ...;

    public CompletableFuture<Data> retrieve() {
        final CompletableFuture<Data> future = new CompletableFuture<>();

        service.execute(() -> {
            final Data data = ... fetch data ...
            future.complete(data);
        });
        return future;
    }
}

我希望 client/user 能够取消任务:

final DataRetriever retriever = new DataRetriever();
final CompletableFuture<Data> future = retriever().retrieve();

future.cancel(true);

这不起作用,因为这会取消外部 CompletableFuture,但不会取消执行程序服务中安排的内部未来。

是否有可能将外部未来的cancel()传播到内部未来?

CompletableFuture#cancel 实际上只是用于将 CompletableFuture 标记为已取消。它不会通知正在执行的任务停止,因为它与任何此类任务都没有关系。

javadoc 暗示了这一点

mayInterruptIfRunning - this value has no effect in this implementation because interrupts are not used to control processing.

您示例中的 inner 未来是 Future,而不是 CompletableFuture,它确实与执行 Runnable(或Callable)。在内部,因为它知道 Thread 任务正在执行什么,它可以向它发送一个 interrupt 来尝试停止它。

一个选择是 return 某种类型的元组(例如某些 POJO),它提供对 CompletableFutureFuture return 的引用由 ExecutorService#submit 编辑。如果需要,您可以使用 Futurecancel。您必须记住 cancelcomplete 您的 CompletableFuture,这样您代码的其他部分就不会永远保持 blocked/starved。

通过向外部未来添加异常处理程序,您可以将对 cancel 的调用传递到内部未来。这是有效的,因为 CompletableFuture.cancel 导致未来以 CancellationException.

异常完成
private final ExecutorService service = ...;

public CompletableFuture<Data> retrieve() {
    final CompletableFuture<Data> outer = new CompletableFuture<>();

    final Future<?> inner = service.submit(() -> {
        ...
        future.complete(data);
    });

    outer.exceptionally((error) -> {
        if (error instanceof CancellationException) {
            inner.cancel(true);
        }
        return null; // must return something, because 'exceptionally' expects a Function
    });

    return outer;
}

调用 outer.exceptionally 会创建一个新的 CompletableFuture,因此它不会影响 outer 本身的取消或异常状态。您仍然可以将您喜欢的任何其他 CompletionStage 添加到 outer,包括另一个 exceptionally 阶段,它将按预期运行。

Pillar 提到的另一个解决方案是扩展 CompletableFuture。这是一种与您现有代码非常相似的方法。它还处理异常,这是一个很好的奖励。

class CancelableFuture<T> extends CompletableFuture<T> {
    private Future<?> inner;

    /**
     * Creates a new CancelableFuture which will be completed by calling the
     * given {@link Callable} via the provided {@link ExecutorService}.
     */
    public CancelableFuture(Callable<T> task, ExecutorService executor) {
        this.inner = executor.submit(() -> complete(task));
    }

    /**
     * Completes this future by executing a {@link Callable}. If the call throws
     * an exception, the future will complete with that exception. Otherwise,
     * the future will complete with the value returned from the callable.
     */
    private void complete(Callable<T> callable) {
        try {
            T result = callable.call();
            complete(result);
        } catch (Exception e) {
            completeExceptionally(e);
        }
    }

    @Override
    public boolean cancel(boolean mayInterrupt) {
        return inner.cancel(mayInterrupt) && super.cancel(true);
    }
}

然后,在DataRetriever中,你可以简单地做:

public CompletableFuture<Data> retrieve() {
    return new CancelableFuture<>(() -> {... fetch data ...}, service);
}

使用我的 Tascalate Concurrent 库,您的代码可以重写如下:

class DataRetriever {
  private final ExecutorService service = ...;

  public Promise<Data> retrieve() {
    return CompletableTask.supplyAsync(() -> {
      final Data data = ... fetch data ...
      return data;
    }, service);
  }
}

PromiseCompletableTask 来自我的图书馆 类,您可以在 my blog

中阅读更多内容