现在关闭 ExecutionException

Shutdown now on ExecutionException

我读了很多关于ExecutorService的post,但我找不到做我需要的方法。

我需要一些并发线程。当它们中的任何一个抛出自定义异常时,所有剩余的任务都会被取消。

这是我所做的一个例子。该任务正在并发工作,但不会因异常而中断。

public class Main {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        List<Future> futures = new ArrayList<Future>();

        futures.add(executorService.submit(new Callable<Void>() {
            public Void call() throws Exception {
                Thread.sleep(5000);
                System.out.println("Task 1 done");
                return null;
            }
        }));

        futures.add(executorService.submit(new Callable<Void>() {
            public Void call() throws Exception {
                Thread.sleep(2000);
                System.out.println("Task 2 done");
                if (true) {
                    throw new CustomException("Error on task 2");
                }
                return null;
            }
        }));
        executorService.shutdown();

        try {
            executeFutures(futures);
        } catch (CustomException ex) {
            System.out.println("Received:" + ex.getMessage());
            executorService.shutdownNow();
        }    
    }

    private static void executeFutures(List<Future> futures) throws CustomException {
        try {
            for (Future f : futures) {
                f.get();
            }
        } catch (ExecutionException | InterruptedException e) {
            if (e.getCause() instanceof CustomException) {
                throw (CustomException) e.getCause();
            }
        }
    }    
}

这是输出:

Task 2 done  //exception is thrown here but task1 continue.
Task 1 done
Received:Error on task 2

任何帮助将不胜感激。

你的问题是由于方法executeFutures使主线程在长任务对应的第一个Future实例上调用f.get(),这使得它等待任务的持续时间,因此无论发生什么至少 5 秒。一旦完成,它将在已经结束的第二个 Future 上调用 f.get(),因此它会立即从 ExecutionException 获取 CustomException 并调用 executorService.shutdownNow() 但它是已经太晚了,因为没有更多的任务可以打断了。

你可以做的是使用类型 Callable 的装饰器,它会在抛出 CustomException 时自动关闭线程池,这样线程池将直接由已执行抛出异常的任务的线程,而不是使用主线程。

像这样:

public class AutoShutdown<V> implements Callable<V> {

    private final ExecutorService executorService;
    private final Callable<V> task;

    public AutoShutdown(final ExecutorService executorService, final Callable<V> task) {
        this.executorService = executorService;
        this.task = task;
    }

    @Override
    public V call() throws Exception {
        try {
            return task.call();
        } catch (CustomException e) {
            executorService.shutdownNow();
            throw e;
        }
    }
}

接下来您需要通过装饰器提交您的任务:

futures.add(
    executorService.submit(
        new AutoShutdown<>(
            executorService,
            new Callable<Void>() {
                public Void call() throws Exception {
                    Thread.sleep(5000);
                    System.out.println("Task 1 done");
                    return null;
                }
            }
        )
    )
);

futures.add(
    executorService.submit(
        new AutoShutdown<>(
            executorService,
            new Callable<Void>() {
                public Void call() throws Exception {
                    Thread.sleep(2000);
                    System.out.println("Task 2 done");
                    if (true) {
                        throw new CustomException("Error on task 2");
                    }
                    return null;
                }
            }
        )
    )
);

输出:

Task 2 done

正如您在输出中看到的那样,任务一很快就被中断了。


The message "Received:Error on task 2" was not thrown, so it looks like a successful execution, and is not the case

不,这只是因为第一次调用 f.get() 会按预期抛出 InterruptedException,这使得它从 executeFutures 退出,因为 catch 是在循环外执行的,将它移到循环内循环如下:

private static void executeFutures(List<Future> futures) throws CustomException {
    for (Future f : futures) {
        try {
            f.get();
        } catch (ExecutionException | InterruptedException e) {
            if (e.getCause() instanceof CustomException) {
                throw (CustomException) e.getCause();
            }
        }
    }
}

输出:

Task 2 done
Received:Error on task 2