当其中一个抛出异常时如何防止执行runnables
How to prevent execution of runnables when one of them throws exception
我有一组元素,我正在为每个元素执行方法,方法是将其作为 Runnable 传递给 CompletableFuture.runAsync()。在执行过程中,可能需要停止整个计算,所以我在执行方法之前检查了一些条件。如果应该停止计算,那么我将抛出一个异常,该异常在 CompletableFuture 之外处理。我想阻止执行所有在抛出异常后执行的 Runnable。所以,换句话说,我不想等待所有 CompletableFutures 完成,当它们中的任何一个抛出异常时。
Set elements = ...
Executor executor = Executors.newFixedThreadPool(N);
try {
CompletableFuture.allOf(elements.stream().map(e - > CompletableFuture.runAsync(() - > {
if (shouldStop()) {
throw new MyException();
}
myMethod(e);
}, executor)).toArray(CompletableFuture[]::new)).join()
} catch (CompletionException e) {
...
}
我对 CompletableFuture
没有太多(嗯!)经验,但我有一个建议(可能有帮助?)
你能在 try 块外的 CompletableFuture.allOf(elements.stream().map
内声明 lambda 吗?这样 non of futures 得到 运行,直到在 try 里面。但它们仍然可以被 catch 块访问。然后您可以在其中 cancel
所有这些。
您应该做的主要事情是 interrupt
您想要更快终止的所有 运行 任务,这意味着这些任务可能需要检查中断,以便它们知道停止它们正在做和终止更快。
此外,您可以在主线程中继续并让它们在后台终止,而不是等待被中断的任务实际终止。
public static void main(String[] args) {
List<Integer> elements = Arrays.asList(5, null, 6, 3, 4); // these elements will fail fast
// List<Integer> elements = Arrays.asList(5, 2, 6, 3, 4); // these elements will succeed
try {
CountDownLatch latch = new CountDownLatch(elements.size());
ExecutorService executor = Executors.newFixedThreadPool(elements.size());
elements.stream().forEach(e -> {
executor.execute(() -> {
try {
doSomething(e);
latch.countDown();
} catch (Exception ex) {
// shutdown executor ASAP on exception, read the docs for `shutdownNow()`
// it will interrupt all tasks in the executor
if (!executor.isShutdown()) {
executor.shutdownNow();
}
for (int i = (int) latch.getCount(); i >= 0; i--) {
latch.countDown();
}
// log the exception
ex.printStackTrace(System.out);
}
});
});
latch.await();
if (executor.isShutdown()) {
System.out.println("Tasks failed! Terminating remaining tasks in the background.");
} else {
executor.shutdown();
System.out.println("Tasks succeeded!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void doSomething(Integer sleepSecs) {
// You will want to check for `interrupted()` throughout the method you want to be able to cancel
if (Thread.interrupted()) {
System.out.println(Thread.currentThread().getName() + " interrupted early");
return;
}
if (sleepSecs == null) {
System.out.println(Thread.currentThread().getName() + " throwing exception ");
throw new RuntimeException();
}
try {
System.out.println(Thread.currentThread().getName() + " started interruptable sleep for " + sleepSecs + "s");
Thread.sleep(sleepSecs * 1000);
System.out.println(Thread.currentThread().getName() + " finished interruptable sleep" + sleepSecs + "s");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " interrupted sleep!");
}
// ...possibly some part of the task that can't be skipped, such as cleanup
System.out.println(Thread.currentThread().getName() + " complete!");
}
发生异常时全部取消即可。障碍在于您在创建它们时并不了解所有这些,并且您不想多次完成这项工作。这可以通过首先创建一个新的空 CompletableFuture
(我们称之为 f1
)来解决。然后,像以前一样创建期货,但在 if(shouldStop()) { … }
语句中插入对 f1.cancel
的调用。然后,在创建所有期货后,将取消所有期货的操作链接到 f1
期货。
取消将有两个目的,它会阻止尚未开始的可运行程序的执行,并且会使 returned by allOf
的未来不等待仍在进行中的完成评价。
由于取消 CompletableFuture
与使用 CancellationException
异常完成它没有什么不同,并且在出现多个异常的情况下,allOf
编辑的未来 return 将报告任意一个,我们可以将 completeExceptionally
与自定义 MyException
一起使用,以确保报告的异常不会是次要的 CancellationException
.
一个独立的例子是:
static final AtomicInteger STOP = new AtomicInteger(2);
static boolean shouldStop() {
return STOP.getAndDecrement() <= 0;
}
static final int N = 10;
public static void main(String[] args) {
Set<Integer> elements = IntStream.range(0, 100).boxed().collect(Collectors.toSet());
ExecutorService executor = Executors.newFixedThreadPool(N);
try {
CompletableFuture<?> cancelAll = new CompletableFuture<>();
CompletableFuture<?>[] all = elements.stream()
.map(e ->
CompletableFuture.runAsync(() -> {
System.out.println("entered "+e);
if(shouldStop()) {
RuntimeException myException = new RuntimeException("stopped");
// alternatively cancelAll.cancel(false);
cancelAll.completeExceptionally(myException);
throw myException;
}
System.out.println("processing "+e);
}, executor))
.toArray(CompletableFuture<?>[]::new);
cancelAll.whenComplete((value,throwable) -> {
if(throwable != null) {
for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);
}
});
CompletableFuture.allOf(all).join();
} catch (CompletionException e) {
e.printStackTrace();
}
executor.shutdown();
}
这将打印类似
的内容
entered 3
entered 8
entered 4
entered 6
entered 1
entered 9
entered 0
entered 7
entered 5
entered 2
entered 10
processing 8
processing 3
java.util.concurrent.CompletionException: java.lang.RuntimeException: stopped
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1423)
at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1144)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at CompletableFutureTest.lambda$main(CompletableFutureTest.java:34)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at CompletableFutureTest.lambda$main[=11=](CompletableFutureTest.java:26)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: stopped
at CompletableFutureTest.lambda$main[=11=](CompletableFutureTest.java:25)
... 4 more
显示由于并发性,一些可运行对象已经 运行 但一旦取消传播,将不会启动后续执行。
请注意,由于 cancelAll
只会在异常情况下完成或根本不会完成,您可以将链接操作简化为 cancelAll.whenComplete((value,throwable) -> { for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable); });
但是否保留冗余检查只是编码风格的问题与否。
您还可以在处理步骤中添加延迟,以查看 allOf(all).join()
如果满足停止条件则不会等待完成。
也可以将一个动作链接到 return 由 runAsync
编辑的期货,这将在任何异常完成时取消所有这些操作,而不仅仅是明确停止。但是,必须注意 return 表示通过 runAsync
安排的操作的原始未来,而不是 whenComplete
编辑的未来 return。
CompletableFuture<?> cancelAll = new CompletableFuture<>();
CompletableFuture<?>[] all = elements.stream()
.map(e -> {
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
System.out.println("entered "+e);
if(shouldStop()) throw new RuntimeException("stopped");
System.out.println("processing "+e);
}, executor);
cf.whenComplete((value,throwable) -> {
if(throwable != null) cancelAll.completeExceptionally(throwable);
});
return cf;
})
.toArray(CompletableFuture<?>[]::new);
cancelAll.whenComplete((value,throwable) -> {
for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);
});
CompletableFuture.allOf(all).join();
我有一组元素,我正在为每个元素执行方法,方法是将其作为 Runnable 传递给 CompletableFuture.runAsync()。在执行过程中,可能需要停止整个计算,所以我在执行方法之前检查了一些条件。如果应该停止计算,那么我将抛出一个异常,该异常在 CompletableFuture 之外处理。我想阻止执行所有在抛出异常后执行的 Runnable。所以,换句话说,我不想等待所有 CompletableFutures 完成,当它们中的任何一个抛出异常时。
Set elements = ...
Executor executor = Executors.newFixedThreadPool(N);
try {
CompletableFuture.allOf(elements.stream().map(e - > CompletableFuture.runAsync(() - > {
if (shouldStop()) {
throw new MyException();
}
myMethod(e);
}, executor)).toArray(CompletableFuture[]::new)).join()
} catch (CompletionException e) {
...
}
我对 CompletableFuture
没有太多(嗯!)经验,但我有一个建议(可能有帮助?)
你能在 try 块外的 CompletableFuture.allOf(elements.stream().map
内声明 lambda 吗?这样 non of futures 得到 运行,直到在 try 里面。但它们仍然可以被 catch 块访问。然后您可以在其中 cancel
所有这些。
您应该做的主要事情是 interrupt
您想要更快终止的所有 运行 任务,这意味着这些任务可能需要检查中断,以便它们知道停止它们正在做和终止更快。
此外,您可以在主线程中继续并让它们在后台终止,而不是等待被中断的任务实际终止。
public static void main(String[] args) {
List<Integer> elements = Arrays.asList(5, null, 6, 3, 4); // these elements will fail fast
// List<Integer> elements = Arrays.asList(5, 2, 6, 3, 4); // these elements will succeed
try {
CountDownLatch latch = new CountDownLatch(elements.size());
ExecutorService executor = Executors.newFixedThreadPool(elements.size());
elements.stream().forEach(e -> {
executor.execute(() -> {
try {
doSomething(e);
latch.countDown();
} catch (Exception ex) {
// shutdown executor ASAP on exception, read the docs for `shutdownNow()`
// it will interrupt all tasks in the executor
if (!executor.isShutdown()) {
executor.shutdownNow();
}
for (int i = (int) latch.getCount(); i >= 0; i--) {
latch.countDown();
}
// log the exception
ex.printStackTrace(System.out);
}
});
});
latch.await();
if (executor.isShutdown()) {
System.out.println("Tasks failed! Terminating remaining tasks in the background.");
} else {
executor.shutdown();
System.out.println("Tasks succeeded!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void doSomething(Integer sleepSecs) {
// You will want to check for `interrupted()` throughout the method you want to be able to cancel
if (Thread.interrupted()) {
System.out.println(Thread.currentThread().getName() + " interrupted early");
return;
}
if (sleepSecs == null) {
System.out.println(Thread.currentThread().getName() + " throwing exception ");
throw new RuntimeException();
}
try {
System.out.println(Thread.currentThread().getName() + " started interruptable sleep for " + sleepSecs + "s");
Thread.sleep(sleepSecs * 1000);
System.out.println(Thread.currentThread().getName() + " finished interruptable sleep" + sleepSecs + "s");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " interrupted sleep!");
}
// ...possibly some part of the task that can't be skipped, such as cleanup
System.out.println(Thread.currentThread().getName() + " complete!");
}
发生异常时全部取消即可。障碍在于您在创建它们时并不了解所有这些,并且您不想多次完成这项工作。这可以通过首先创建一个新的空 CompletableFuture
(我们称之为 f1
)来解决。然后,像以前一样创建期货,但在 if(shouldStop()) { … }
语句中插入对 f1.cancel
的调用。然后,在创建所有期货后,将取消所有期货的操作链接到 f1
期货。
取消将有两个目的,它会阻止尚未开始的可运行程序的执行,并且会使 returned by allOf
的未来不等待仍在进行中的完成评价。
由于取消 CompletableFuture
与使用 CancellationException
异常完成它没有什么不同,并且在出现多个异常的情况下,allOf
编辑的未来 return 将报告任意一个,我们可以将 completeExceptionally
与自定义 MyException
一起使用,以确保报告的异常不会是次要的 CancellationException
.
一个独立的例子是:
static final AtomicInteger STOP = new AtomicInteger(2);
static boolean shouldStop() {
return STOP.getAndDecrement() <= 0;
}
static final int N = 10;
public static void main(String[] args) {
Set<Integer> elements = IntStream.range(0, 100).boxed().collect(Collectors.toSet());
ExecutorService executor = Executors.newFixedThreadPool(N);
try {
CompletableFuture<?> cancelAll = new CompletableFuture<>();
CompletableFuture<?>[] all = elements.stream()
.map(e ->
CompletableFuture.runAsync(() -> {
System.out.println("entered "+e);
if(shouldStop()) {
RuntimeException myException = new RuntimeException("stopped");
// alternatively cancelAll.cancel(false);
cancelAll.completeExceptionally(myException);
throw myException;
}
System.out.println("processing "+e);
}, executor))
.toArray(CompletableFuture<?>[]::new);
cancelAll.whenComplete((value,throwable) -> {
if(throwable != null) {
for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);
}
});
CompletableFuture.allOf(all).join();
} catch (CompletionException e) {
e.printStackTrace();
}
executor.shutdown();
}
这将打印类似
的内容entered 3
entered 8
entered 4
entered 6
entered 1
entered 9
entered 0
entered 7
entered 5
entered 2
entered 10
processing 8
processing 3
java.util.concurrent.CompletionException: java.lang.RuntimeException: stopped
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1423)
at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1144)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at CompletableFutureTest.lambda$main(CompletableFutureTest.java:34)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at CompletableFutureTest.lambda$main[=11=](CompletableFutureTest.java:26)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: stopped
at CompletableFutureTest.lambda$main[=11=](CompletableFutureTest.java:25)
... 4 more
显示由于并发性,一些可运行对象已经 运行 但一旦取消传播,将不会启动后续执行。
请注意,由于 cancelAll
只会在异常情况下完成或根本不会完成,您可以将链接操作简化为 cancelAll.whenComplete((value,throwable) -> { for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable); });
但是否保留冗余检查只是编码风格的问题与否。
您还可以在处理步骤中添加延迟,以查看 allOf(all).join()
如果满足停止条件则不会等待完成。
也可以将一个动作链接到 return 由 runAsync
编辑的期货,这将在任何异常完成时取消所有这些操作,而不仅仅是明确停止。但是,必须注意 return 表示通过 runAsync
安排的操作的原始未来,而不是 whenComplete
编辑的未来 return。
CompletableFuture<?> cancelAll = new CompletableFuture<>();
CompletableFuture<?>[] all = elements.stream()
.map(e -> {
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
System.out.println("entered "+e);
if(shouldStop()) throw new RuntimeException("stopped");
System.out.println("processing "+e);
}, executor);
cf.whenComplete((value,throwable) -> {
if(throwable != null) cancelAll.completeExceptionally(throwable);
});
return cf;
})
.toArray(CompletableFuture<?>[]::new);
cancelAll.whenComplete((value,throwable) -> {
for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);
});
CompletableFuture.allOf(all).join();