ExecutorService控制的CompletableFuture的注销
Cancellation of CompletableFuture controlled by ExecutorService
我有一个 ExecutorService
将计算的数据转发到 CompletableFuture
:
class DataRetriever {
private final ExecutorService service = ...;
public CompletableFuture<Data> retrieve() {
final CompletableFuture<Data> future = new CompletableFuture<>();
service.execute(() -> {
final Data data = ... fetch data ...
future.complete(data);
});
return future;
}
}
我希望 client/user 能够取消任务:
final DataRetriever retriever = new DataRetriever();
final CompletableFuture<Data> future = retriever().retrieve();
future.cancel(true);
这不起作用,因为这会取消外部 CompletableFuture
,但不会取消执行程序服务中安排的内部未来。
是否有可能将外部未来的cancel()
传播到内部未来?
CompletableFuture#cancel
实际上只是用于将 CompletableFuture
标记为已取消。它不会通知正在执行的任务停止,因为它与任何此类任务都没有关系。
javadoc 暗示了这一点
mayInterruptIfRunning
- this value has no effect in this implementation because interrupts are not used to control processing.
您示例中的 inner 未来是 Future
,而不是 CompletableFuture
,它确实与执行 Runnable
(或Callable
)。在内部,因为它知道 Thread
任务正在执行什么,它可以向它发送一个 interrupt
来尝试停止它。
一个选择是 return 某种类型的元组(例如某些 POJO),它提供对 CompletableFuture
和 Future
return 的引用由 ExecutorService#submit
编辑。如果需要,您可以使用 Future
到 cancel
。您必须记住 cancel
或 complete
您的 CompletableFuture
,这样您代码的其他部分就不会永远保持 blocked/starved。
通过向外部未来添加异常处理程序,您可以将对 cancel
的调用传递到内部未来。这是有效的,因为 CompletableFuture.cancel
导致未来以 CancellationException
.
异常完成
private final ExecutorService service = ...;
public CompletableFuture<Data> retrieve() {
final CompletableFuture<Data> outer = new CompletableFuture<>();
final Future<?> inner = service.submit(() -> {
...
future.complete(data);
});
outer.exceptionally((error) -> {
if (error instanceof CancellationException) {
inner.cancel(true);
}
return null; // must return something, because 'exceptionally' expects a Function
});
return outer;
}
调用 outer.exceptionally
会创建一个新的 CompletableFuture
,因此它不会影响 outer
本身的取消或异常状态。您仍然可以将您喜欢的任何其他 CompletionStage
添加到 outer
,包括另一个 exceptionally
阶段,它将按预期运行。
Pillar 提到的另一个解决方案是扩展 CompletableFuture
。这是一种与您现有代码非常相似的方法。它还处理异常,这是一个很好的奖励。
class CancelableFuture<T> extends CompletableFuture<T> {
private Future<?> inner;
/**
* Creates a new CancelableFuture which will be completed by calling the
* given {@link Callable} via the provided {@link ExecutorService}.
*/
public CancelableFuture(Callable<T> task, ExecutorService executor) {
this.inner = executor.submit(() -> complete(task));
}
/**
* Completes this future by executing a {@link Callable}. If the call throws
* an exception, the future will complete with that exception. Otherwise,
* the future will complete with the value returned from the callable.
*/
private void complete(Callable<T> callable) {
try {
T result = callable.call();
complete(result);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public boolean cancel(boolean mayInterrupt) {
return inner.cancel(mayInterrupt) && super.cancel(true);
}
}
然后,在DataRetriever
中,你可以简单地做:
public CompletableFuture<Data> retrieve() {
return new CancelableFuture<>(() -> {... fetch data ...}, service);
}
使用我的 Tascalate Concurrent 库,您的代码可以重写如下:
class DataRetriever {
private final ExecutorService service = ...;
public Promise<Data> retrieve() {
return CompletableTask.supplyAsync(() -> {
final Data data = ... fetch data ...
return data;
}, service);
}
}
Promise
和 CompletableTask
来自我的图书馆 类,您可以在 my blog
中阅读更多内容
我有一个 ExecutorService
将计算的数据转发到 CompletableFuture
:
class DataRetriever {
private final ExecutorService service = ...;
public CompletableFuture<Data> retrieve() {
final CompletableFuture<Data> future = new CompletableFuture<>();
service.execute(() -> {
final Data data = ... fetch data ...
future.complete(data);
});
return future;
}
}
我希望 client/user 能够取消任务:
final DataRetriever retriever = new DataRetriever();
final CompletableFuture<Data> future = retriever().retrieve();
future.cancel(true);
这不起作用,因为这会取消外部 CompletableFuture
,但不会取消执行程序服务中安排的内部未来。
是否有可能将外部未来的cancel()
传播到内部未来?
CompletableFuture#cancel
实际上只是用于将 CompletableFuture
标记为已取消。它不会通知正在执行的任务停止,因为它与任何此类任务都没有关系。
javadoc 暗示了这一点
mayInterruptIfRunning
- this value has no effect in this implementation because interrupts are not used to control processing.
您示例中的 inner 未来是 Future
,而不是 CompletableFuture
,它确实与执行 Runnable
(或Callable
)。在内部,因为它知道 Thread
任务正在执行什么,它可以向它发送一个 interrupt
来尝试停止它。
一个选择是 return 某种类型的元组(例如某些 POJO),它提供对 CompletableFuture
和 Future
return 的引用由 ExecutorService#submit
编辑。如果需要,您可以使用 Future
到 cancel
。您必须记住 cancel
或 complete
您的 CompletableFuture
,这样您代码的其他部分就不会永远保持 blocked/starved。
通过向外部未来添加异常处理程序,您可以将对 cancel
的调用传递到内部未来。这是有效的,因为 CompletableFuture.cancel
导致未来以 CancellationException
.
private final ExecutorService service = ...;
public CompletableFuture<Data> retrieve() {
final CompletableFuture<Data> outer = new CompletableFuture<>();
final Future<?> inner = service.submit(() -> {
...
future.complete(data);
});
outer.exceptionally((error) -> {
if (error instanceof CancellationException) {
inner.cancel(true);
}
return null; // must return something, because 'exceptionally' expects a Function
});
return outer;
}
调用 outer.exceptionally
会创建一个新的 CompletableFuture
,因此它不会影响 outer
本身的取消或异常状态。您仍然可以将您喜欢的任何其他 CompletionStage
添加到 outer
,包括另一个 exceptionally
阶段,它将按预期运行。
Pillar 提到的另一个解决方案是扩展 CompletableFuture
。这是一种与您现有代码非常相似的方法。它还处理异常,这是一个很好的奖励。
class CancelableFuture<T> extends CompletableFuture<T> {
private Future<?> inner;
/**
* Creates a new CancelableFuture which will be completed by calling the
* given {@link Callable} via the provided {@link ExecutorService}.
*/
public CancelableFuture(Callable<T> task, ExecutorService executor) {
this.inner = executor.submit(() -> complete(task));
}
/**
* Completes this future by executing a {@link Callable}. If the call throws
* an exception, the future will complete with that exception. Otherwise,
* the future will complete with the value returned from the callable.
*/
private void complete(Callable<T> callable) {
try {
T result = callable.call();
complete(result);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public boolean cancel(boolean mayInterrupt) {
return inner.cancel(mayInterrupt) && super.cancel(true);
}
}
然后,在DataRetriever
中,你可以简单地做:
public CompletableFuture<Data> retrieve() {
return new CancelableFuture<>(() -> {... fetch data ...}, service);
}
使用我的 Tascalate Concurrent 库,您的代码可以重写如下:
class DataRetriever {
private final ExecutorService service = ...;
public Promise<Data> retrieve() {
return CompletableTask.supplyAsync(() -> {
final Data data = ... fetch data ...
return data;
}, service);
}
}
Promise
和 CompletableTask
来自我的图书馆 类,您可以在 my blog