如何强制超时和取消异步 CompletableFuture 作业
How to enforce timeout and cancel async CompletableFuture Jobs
我正在使用 Java 8,我想知道在 3 个异步作业上强制执行超时的推荐方法,我将执行异步作业并从未来检索结果。请注意,所有 3 个作业的超时时间都相同。如果超过时间限制我也想取消作业
我在想这样的事情:
// Submit jobs async
List<CompletableFuture<String>> futures = submitJobs(); // Uses CompletableFuture.supplyAsync
List<CompletableFuture<Void>> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
allFutures.get(100L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
for(CompletableFuture f : future) {
if(!f.isDone()) {
/*
From Java Doc:
@param mayInterruptIfRunning this value has no effect in this
* implementation because interrupts are not used to control
* processing.
*/
f.cancel(true);
}
}
}
List<String> output = new ArrayList<>();
for(CompeletableFuture fu : futures) {
if(!fu.isCancelled()) { // Is this needed?
output.add(fu.join());
}
}
return output;
- 这样的东西行得通吗?有没有更好的方法?
- 如何正确取消未来? Java 医生说,线程不能被中断?那么,如果我要取消一个 future 并调用
join()
,我会立即得到结果吗,因为线程不会被中断?
- 等待结束后,推荐使用join()还是get()获取结果?
值得注意的是,在 CompletableFuture 上调用 cancel 与在当前阶段调用 completeExceptionally 实际上是一样的。取消不会影响之前的阶段。话虽如此:
- 原则上,假设不需要上游取消,这样的事情就可以工作(从伪代码的角度来看,上面有语法错误)。
- CompletableFuture取消不会打断当前线程。取消将导致所有下游阶段立即被 CancellationException 触发(将使执行流程短路)。
- 'join' 和 'get' 在调用者愿意无限期等待的情况下实际上是相同的。加入句柄为您包装已检查的异常。如果调用者想要超时,则需要get。
包括一个片段来说明取消时的行为。请注意下游流程如何不会启动,但上游流程即使在取消后也会继续。
public static void main(String[] args) throws Exception
{
int maxSleepTime = 1000;
Random random = new Random();
AtomicInteger value = new AtomicInteger();
List<String> calculatedValues = new ArrayList<>();
Supplier<String> process = () -> { try { Thread.sleep(random.nextInt(maxSleepTime)); System.out.println("Stage 1 Running!"); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.toString(value.getAndIncrement()); };
List<CompletableFuture<String>> stage1 = IntStream.range(0, 10).mapToObj(val -> CompletableFuture.supplyAsync(process)).collect(Collectors.toList());
List<CompletableFuture<String>> stage2 = stage1.stream().map(Test::appendNumber).collect(Collectors.toList());
List<CompletableFuture<String>> stage3 = stage2.stream().map(Test::printIfCancelled).collect(Collectors.toList());
CompletableFuture<Void> awaitAll = CompletableFuture.allOf(stage2.toArray(new CompletableFuture[0]));
try
{
/*Wait 1/2 the time, some should be complete. Some not complete -> TimeoutException*/
awaitAll.get(maxSleepTime / 2, TimeUnit.MILLISECONDS);
}
catch(TimeoutException ex)
{
for(CompletableFuture<String> toCancel : stage2)
{
boolean irrelevantValue = false;
if(!toCancel.isDone())
toCancel.cancel(irrelevantValue);
else
calculatedValues.add(toCancel.join());
}
}
System.out.println("All futures Cancelled! But some Stage 1's may still continue printing anyways.");
System.out.println("Values returned as of cancellation: " + calculatedValues);
Thread.sleep(maxSleepTime);
}
private static CompletableFuture<String> appendNumber(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 2 Running"); return "#" + val; });
}
private static CompletableFuture<String> printIfCancelled(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 3 Running!"); return val; }).exceptionally(ex -> { System.out.println("Stage 3 Cancelled!"); return ex.getMessage(); });
}
如果需要取消上游进程(例如:取消某些网络调用),则需要自定义处理。
调用cancel后你不能加入furure,因为你得到一个异常。
终止计算的一种方法是让它拥有对未来的引用并定期检查它:如果它被取消,则从内部中止计算。
如果计算是一个循环,您可以在每次迭代中进行检查,就可以做到这一点。
你需要它成为 CompletableFuture 吗?另一种方法是避免使用 CompleatableFuture,而是使用简单的 Future 或 FutureTask:如果您使用执行器调用 future.cancel(true) 执行它,则可能会终止计算。
回答问题:“调用join(),我会立即得到结果吗”。
不,你不会立即得到它,它会挂起并等待完成计算:没有办法强制在更短的时间内完成需要很长时间的计算。
您可以调用 future.complete(value) 提供一个值,供引用该未来的其他线程用作默认结果。
我正在使用 Java 8,我想知道在 3 个异步作业上强制执行超时的推荐方法,我将执行异步作业并从未来检索结果。请注意,所有 3 个作业的超时时间都相同。如果超过时间限制我也想取消作业
我在想这样的事情:
// Submit jobs async
List<CompletableFuture<String>> futures = submitJobs(); // Uses CompletableFuture.supplyAsync
List<CompletableFuture<Void>> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
allFutures.get(100L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
for(CompletableFuture f : future) {
if(!f.isDone()) {
/*
From Java Doc:
@param mayInterruptIfRunning this value has no effect in this
* implementation because interrupts are not used to control
* processing.
*/
f.cancel(true);
}
}
}
List<String> output = new ArrayList<>();
for(CompeletableFuture fu : futures) {
if(!fu.isCancelled()) { // Is this needed?
output.add(fu.join());
}
}
return output;
- 这样的东西行得通吗?有没有更好的方法?
- 如何正确取消未来? Java 医生说,线程不能被中断?那么,如果我要取消一个 future 并调用
join()
,我会立即得到结果吗,因为线程不会被中断? - 等待结束后,推荐使用join()还是get()获取结果?
值得注意的是,在 CompletableFuture 上调用 cancel 与在当前阶段调用 completeExceptionally 实际上是一样的。取消不会影响之前的阶段。话虽如此:
- 原则上,假设不需要上游取消,这样的事情就可以工作(从伪代码的角度来看,上面有语法错误)。
- CompletableFuture取消不会打断当前线程。取消将导致所有下游阶段立即被 CancellationException 触发(将使执行流程短路)。
- 'join' 和 'get' 在调用者愿意无限期等待的情况下实际上是相同的。加入句柄为您包装已检查的异常。如果调用者想要超时,则需要get。
包括一个片段来说明取消时的行为。请注意下游流程如何不会启动,但上游流程即使在取消后也会继续。
public static void main(String[] args) throws Exception
{
int maxSleepTime = 1000;
Random random = new Random();
AtomicInteger value = new AtomicInteger();
List<String> calculatedValues = new ArrayList<>();
Supplier<String> process = () -> { try { Thread.sleep(random.nextInt(maxSleepTime)); System.out.println("Stage 1 Running!"); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.toString(value.getAndIncrement()); };
List<CompletableFuture<String>> stage1 = IntStream.range(0, 10).mapToObj(val -> CompletableFuture.supplyAsync(process)).collect(Collectors.toList());
List<CompletableFuture<String>> stage2 = stage1.stream().map(Test::appendNumber).collect(Collectors.toList());
List<CompletableFuture<String>> stage3 = stage2.stream().map(Test::printIfCancelled).collect(Collectors.toList());
CompletableFuture<Void> awaitAll = CompletableFuture.allOf(stage2.toArray(new CompletableFuture[0]));
try
{
/*Wait 1/2 the time, some should be complete. Some not complete -> TimeoutException*/
awaitAll.get(maxSleepTime / 2, TimeUnit.MILLISECONDS);
}
catch(TimeoutException ex)
{
for(CompletableFuture<String> toCancel : stage2)
{
boolean irrelevantValue = false;
if(!toCancel.isDone())
toCancel.cancel(irrelevantValue);
else
calculatedValues.add(toCancel.join());
}
}
System.out.println("All futures Cancelled! But some Stage 1's may still continue printing anyways.");
System.out.println("Values returned as of cancellation: " + calculatedValues);
Thread.sleep(maxSleepTime);
}
private static CompletableFuture<String> appendNumber(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 2 Running"); return "#" + val; });
}
private static CompletableFuture<String> printIfCancelled(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 3 Running!"); return val; }).exceptionally(ex -> { System.out.println("Stage 3 Cancelled!"); return ex.getMessage(); });
}
如果需要取消上游进程(例如:取消某些网络调用),则需要自定义处理。
调用cancel后你不能加入furure,因为你得到一个异常。
终止计算的一种方法是让它拥有对未来的引用并定期检查它:如果它被取消,则从内部中止计算。 如果计算是一个循环,您可以在每次迭代中进行检查,就可以做到这一点。
你需要它成为 CompletableFuture 吗?另一种方法是避免使用 CompleatableFuture,而是使用简单的 Future 或 FutureTask:如果您使用执行器调用 future.cancel(true) 执行它,则可能会终止计算。
回答问题:“调用join(),我会立即得到结果吗”。
不,你不会立即得到它,它会挂起并等待完成计算:没有办法强制在更短的时间内完成需要很长时间的计算。
您可以调用 future.complete(value) 提供一个值,供引用该未来的其他线程用作默认结果。