CompletableFuture#whenComplete 如果使用 thenApply 则不调用
CompletableFuture#whenComplete not called if thenApply is used
我有以下代码(来自 )在远程服务器上安排任务,然后使用 ScheduledExecutorService#scheduleAtFixedRate
轮询完成。任务完成后,它会下载结果。
我想 return 一个 Future
给呼叫者,这样他们就可以决定何时阻止以及阻止多长时间,并让他们可以选择取消任务。
我的问题是,如果客户端取消由download
方法编辑的Future
return,whenComplete
块不会执行。如果我删除 thenApply
它会。很明显我对 Future
作文有些误解...我应该更改什么?
public Future<Object> download(Something something) {
String jobId = schedule(something);
CompletableFuture<String> job = pollForCompletion(jobId);
return job.thenApply(this::downloadResult);
}
private CompletableFuture<String> pollForCompletion(String jobId) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
CompletableFuture<String> completionFuture = new CompletableFuture<>();
ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> {
if (pollRemoteServer(jobId).equals("COMPLETE")) {
completionFuture.complete(jobId);
}
}, 0, 10, TimeUnit.SECONDS);
completionFuture
.whenComplete((result, thrown) -> {
System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed
checkFuture.cancel(true);
executor.shutdown();
});
return completionFuture;
}
同理,如果我这样做:
return completionFuture.whenComplete(...)
而不是
completionFuture.whenComplete(...);
return completionFuture;
whenComplete
也从不执行。这对我来说似乎非常违反直觉。 whenComplete
编辑的 Future
return 逻辑上不应该是我应该坚持的吗?
编辑:
我更改了我的代码以明确反向传播取消。这是可恶的和不可读的,但它有效,我找不到更好的方法:
public Future<Object> download(Something something) throws ChartDataGenException, Exception {
String jobId = schedule(something);
CompletableFuture<String> job = pollForCompletion(jobId);
CompletableFuture<Object> resulting = job.thenApply(this::download);
resulting.whenComplete((result, thrown) -> {
if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better
job.cancel(true);
}
});
return resulting;
}
编辑 2:
我发现了 tascalate-concurrent,一个很棒的库,它提供了 CompletionStage
的合理实现,支持依赖承诺(通过 DependentPromise
class),可以透明地反向传播取消。似乎非常适合这个用例。
这应该足够了:
DependentPromise
.from(pollForCompletion(jobId))
.thenApply(this::download, true); //true means the cancellation should back-propagate
请注意,我没有测试过这种方法。
你的结构如下:
┌──────────────────┐
│ completionFuture |
└──────────────────┘
↓ ↓
┌──────────────┐ ┌───────────┐
│ whenComplete | │ thenApply |
└──────────────┘ └───────────┘
因此,当您取消 thenApply
未来时,原始 completionFuture
对象不受影响,因为它不依赖于 thenApply
阶段。但是,如果您不链接 thenApply
阶段,您将返回原始 completionFuture
实例并且取消此阶段会导致所有相关阶段的取消,从而导致 whenComplete
操作立即执行。
但是当thenApply
阶段被取消时,completionFuture
仍然可能在pollRemoteServer(jobId).equals("COMPLETE")
条件满足时完成,因为轮询不会停止。但是我们不知道jobId = schedule(something)
和pollRemoteServer(jobId)
的关系。如果您的应用程序状态更改为取消下载后永远无法满足此条件,则此未来将永远不会完成...
关于你的最后一个问题,“我应该持有的那个未来?”,没有要求有一个线性的期货链,事实上,CompletableFuture
的便利方法使得创建这样一个链很容易,但通常情况下,这是最没有用的事情,因为如果你有线性依赖,你可以只写一段代码。您链接两个独立阶段的模型是正确的,但取消不会通过它起作用,但它也不会通过线性链起作用。
如果您希望能够取消源阶段,您需要引用它,但如果您希望能够获得依赖阶段的结果,您也需要对该阶段的引用.
我有以下代码(来自 ScheduledExecutorService#scheduleAtFixedRate
轮询完成。任务完成后,它会下载结果。
我想 return 一个 Future
给呼叫者,这样他们就可以决定何时阻止以及阻止多长时间,并让他们可以选择取消任务。
我的问题是,如果客户端取消由download
方法编辑的Future
return,whenComplete
块不会执行。如果我删除 thenApply
它会。很明显我对 Future
作文有些误解...我应该更改什么?
public Future<Object> download(Something something) {
String jobId = schedule(something);
CompletableFuture<String> job = pollForCompletion(jobId);
return job.thenApply(this::downloadResult);
}
private CompletableFuture<String> pollForCompletion(String jobId) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
CompletableFuture<String> completionFuture = new CompletableFuture<>();
ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> {
if (pollRemoteServer(jobId).equals("COMPLETE")) {
completionFuture.complete(jobId);
}
}, 0, 10, TimeUnit.SECONDS);
completionFuture
.whenComplete((result, thrown) -> {
System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed
checkFuture.cancel(true);
executor.shutdown();
});
return completionFuture;
}
同理,如果我这样做:
return completionFuture.whenComplete(...)
而不是
completionFuture.whenComplete(...);
return completionFuture;
whenComplete
也从不执行。这对我来说似乎非常违反直觉。 whenComplete
编辑的 Future
return 逻辑上不应该是我应该坚持的吗?
编辑:
我更改了我的代码以明确反向传播取消。这是可恶的和不可读的,但它有效,我找不到更好的方法:
public Future<Object> download(Something something) throws ChartDataGenException, Exception {
String jobId = schedule(something);
CompletableFuture<String> job = pollForCompletion(jobId);
CompletableFuture<Object> resulting = job.thenApply(this::download);
resulting.whenComplete((result, thrown) -> {
if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better
job.cancel(true);
}
});
return resulting;
}
编辑 2:
我发现了 tascalate-concurrent,一个很棒的库,它提供了 CompletionStage
的合理实现,支持依赖承诺(通过 DependentPromise
class),可以透明地反向传播取消。似乎非常适合这个用例。
这应该足够了:
DependentPromise
.from(pollForCompletion(jobId))
.thenApply(this::download, true); //true means the cancellation should back-propagate
请注意,我没有测试过这种方法。
你的结构如下:
┌──────────────────┐
│ completionFuture |
└──────────────────┘
↓ ↓
┌──────────────┐ ┌───────────┐
│ whenComplete | │ thenApply |
└──────────────┘ └───────────┘
因此,当您取消 thenApply
未来时,原始 completionFuture
对象不受影响,因为它不依赖于 thenApply
阶段。但是,如果您不链接 thenApply
阶段,您将返回原始 completionFuture
实例并且取消此阶段会导致所有相关阶段的取消,从而导致 whenComplete
操作立即执行。
但是当thenApply
阶段被取消时,completionFuture
仍然可能在pollRemoteServer(jobId).equals("COMPLETE")
条件满足时完成,因为轮询不会停止。但是我们不知道jobId = schedule(something)
和pollRemoteServer(jobId)
的关系。如果您的应用程序状态更改为取消下载后永远无法满足此条件,则此未来将永远不会完成...
关于你的最后一个问题,“我应该持有的那个未来?”,没有要求有一个线性的期货链,事实上,CompletableFuture
的便利方法使得创建这样一个链很容易,但通常情况下,这是最没有用的事情,因为如果你有线性依赖,你可以只写一段代码。您链接两个独立阶段的模型是正确的,但取消不会通过它起作用,但它也不会通过线性链起作用。
如果您希望能够取消源阶段,您需要引用它,但如果您希望能够获得依赖阶段的结果,您也需要对该阶段的引用.