在线程失败和异常处理时停止 ExecutorService

Stop ExecutorService on thread failure and exception handling

这是我为了暴露我的问题而做的一个简化示例。我有一些任务 doSomeWork(),我使用 ExecutorService(一次最多 4 个线程)以多线程方式处理。但是,如果任何 threads/tasks 产生异常,我想:

  1. 停止处理任何进一步的任务。

  2. 在主线程级别捕获异常。

    public static void main(String[] args) {
        final ExecutorService threadPool = Executors.newFixedThreadPool(4);
        final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(threadPool);
    
        try {
            for (int i = 0; i < 10; i++) {
                int b = i;
                    completionService.submit(() -> doSomeWork(b));
            }
    
            threadPool.shutdown();
            threadPool.awaitTermination(8, TimeUnit.HOURS);
    
            System.exit(0);
    
        } catch (Exception e) {
            System.out.println("Something wrong happened: " + e.getMessage());
        }
    
        System.exit(1);
    
    }
    
    //This function have 50% odds of throwing an exception
    public static Void doSomeWork(int i) throws Exception {
    
        Thread.sleep(500);
        if ((Math.random() > 0.5))
        {
            System.out.println("I have reached indice: " + i);
        }
        else
        {
            throw new Exception("I couldn't handle indice " + i);
        }
        return null;
    }
    

目前,执行会输出如下内容:

I have reached indice: 0
I have reached indice: 2
I have reached indice: 1
I have reached indice: 4
I have reached indice: 6
I have reached indice: 7
I have reached indice: 5
I have reached indice: 9

如您所见,缺少indice 3,但其余线程的执行已完成。它也没有输出任何关于异常的信息。

我想要的输出是这样的:

I have reached indice: 0
I have reached indice: 2
I have reached indice: 1
Something wrong happened: I couldn't handle indice 3

我发现围绕这个问题的其他解决方案是使用可调用的未来但以阻塞方式。我不能在等待未来时阻止其他线程的执行,否则整个多线程毫无意义。

您可以使用 CompletableFuture 来做到这一点。这是我测试的你的主要功能:

final ExecutorService executorService = Executors.newFixedThreadPool(4);
final List<CompletableFuture<Void>> all = new ArrayList<>();

try {
    for (int i = 0; i < 10; i++) {
        int b = i;
        CompletableFuture<Void> v = CompletableFuture.runAsync(() -> {
                    try {
                        doSomeWork(b);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                },
                executorService);
        all.add(v);
    }

    CompletableFuture<Void> placeholder = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
    failFast(all, placeholder);

    System.out.println("All tasks ended");

} catch (Exception e) {
    System.out.println("Something wrong happened: " + e.getMessage());
} finally {
    executorService.shutdownNow();
}

一旦其中一个失败(或所有完成时)使联合未来失败的实用函数:

private static <T> void failFast(List<CompletableFuture<T>> futures, CompletableFuture<T> joint) {
    while (true) {
        if (joint.isDone()) {
            return;
        }
        for (CompletableFuture<T> future : futures) {
            if (future.isCompletedExceptionally()) {
                return;
            }
        }
    }
}

这是我得到的输出:

I have reached indice: 1
I have reached indice: 7
I have reached indice: 5
I have reached indice: 4
Something wrong happened: java.lang.RuntimeException: java.lang.Exception: I couldn't handle indice 0

解释:

方法CompletableFuture.runAsync()允许您提供一个Runnable(您的doSomeWork)和一个具有一定数量线程的执行器。在这里,我传递了一个具有 4 个线程的执行程序(就像您在示例中所做的那样)。

在 运行nable 中,我不仅 运行 doSomeWork 函数,而且我还捕获 Exception 并抛出一个 RuntimeException(需要这样做是因为 Lambda 不支持已检查的异常,所以我需要将它包装成一个 运行time ,但它仍然会中断执行并被你的 main 捕获)。

每次我为具有给定索引 i 的任务创建一个新的 CompletableFuture<Void> 时,我会将此结果存储到一个可完成的期货列表中。

for 循环不会执行任何操作,因为可完成的 futures 运行 是异步的。

因此,我与 CompletableFuture.allOf(...) 创建了一个联合可完成的未来,然后我在这个未来上使用效用函数 failFast 以便在其中一个任务失败时立即停止(或继续,直到所有这些都完成)。

所以基本上,一旦其中一个期货抛出异常,联合期货就被认为已经完成,因此会将句柄留给你的主线程,同时,被 RuntimeException 被扔进了 lambda 表达式。

注意: 感谢 Thomas 的评论,我更新了代码以使用 ExecutorService 而不是简单的 Executor。这允许您在捕获到异常后在 finally 块中调用 shutdownNow() 。 正如 Thomas 所建议的,您也可以直接在 doSomeWork 函数中抛出一个 RuntimeException,这样您就不需要在 lambda 表达式中捕获和包装。

其他注意事项: @matt 让我注意到一些我不知道的事情。 .allOf() 未来将在 ALL 未来完成时完成,无论成功与否。 因此,正如他指出的那样,我的解决方案不会按原样工作。我再次编辑了答案以考虑他的评论,感谢@matt 让我注意到。

听起来您基于错误的假设排除了执行此操作的正确方法。保持你的未来。

List<Future<?>> futures = new ArrayList<>();

然后在你提交的时候。

futures.add( completionService.submit( () -> doSomeWork(b) ) );

现在,您可以在主线程中查看期货状态。

for(Future<?> f: futures){
    try{
        f.get();
    } catch( ExecutionException e){
        //execution exception handled on the main thread.
        completionService.shutdownNow();
    } catch( InterruptedException ie){
        //what should happen here.
    }
}

这样,shutdownNow 就会被调用,所以所有未启动的任务都会返回并且不会启动。

您可以使用 get 的超时来检查每个任务,因为有些任务会 运行 并行。

这是一个完整的可编译示例。

import java.util.concurrent.*;
import java.util.*;

public class ExecutorJunk{
    static int count = 0;
    static void task(){
        int z = count++;
        
        if(z == 3){
            throw new RuntimeException("z is 3");
        }
        
        System.out.println("z: " + z);
        try{ Thread.sleep(1500);} catch(InterruptedException e){};
    }
    
    public static void main(String[] args){
        ExecutorService service = Executors.newFixedThreadPool(4);
        
        List<Future<?>> all = new ArrayList<>();
        
        for(int i = 0; i<10; i++){
            all.add( service.submit(ExecutorJunk::task) );
        }
        service.shutdown();
        
        try{
            while(!service.isTerminated()){
                for(Future f: all){
                    try{
                      f.get(1, TimeUnit.MILLISECONDS);
                    } catch( TimeoutException toe){
                      //pass.
                    }
                }
            }
        } catch(Exception e){
            System.out.println( service.shutdownNow().size() + " tasks not started");
            e.printStackTrace();
        } 
    }
}

当我 运行 我得到了。

z: 0
z: 1
z: 2
z: 4
5 tasks not started
java.util.concurrent.ExecutionException: java.lang.RuntimeException: z is 3
...

它可能可以做得更聪明一点。例如在 get 成功时清除期货列表,而不是使用超时只是检查期货是否完成,然后执行 future.get.