使用FutureClass模拟任务T的多线程进程

Simulate the multi-thread process of task T by using Future Class

这里有一个任务 T:
T由许多子任务组成,每个子任务的完成时间不同。
如果其中一个子任务失败,其他的应该立即停止,任务T也会失败。

那么如何模拟任务T的过程呢? (需要快速失败)
也许 Future Class 可以解决它。但是怎么办?

最简单的解决方法是将这些 Runnable 分离到它们自己的线程池中,这样您就可以仅在该池上调用 shutdownNow(),这会中断该池中的所有任务

下面是一个使用 FutureTask 的例子。

其中一个任务通过等待 1 秒然后取消其他任务来模拟失败。即使他们等待 5 秒,程序也会在 1 秒内完成。

您需要将您的 Runnable 包装在类似这样的东西中,如果实际任务失败可以取消。

为了能够取消任务,它需要处于 Thread.interrupt() 将停止线程的状态。例如,如果它只是处于一个循环中,那么 Future 将被取消(因此 get() 将立即 return),但任务本身将保持 运行.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class FutureTaskExample {

    public static void main(String[] args) throws InterruptedException {
        List<FutureTask<String>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10; ++i) {
            final int ii = i;
            FutureTask<String> task =
                    new FutureTask<>(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            if (ii == 9) {
                                Thread.sleep(1000);
                                tasks.subList(0,9).stream().forEach(t -> t.cancel(true));
                                return "Failure";
                            } else {
                                long start = System.currentTimeMillis();
                                Thread.sleep(5000);
                                System.out.println("Task " + ii + " slept for " + (System.currentTimeMillis() - start));
                                return "Completed";
                            }
                        }
                    });
            tasks.add(task);
            executorService.execute(task);
        }
        List<String> results = tasks.stream().map(t -> {
            try {
                return t.get();
            } catch (Exception e) {
                // ignore
                e.printStackTrace();
                return "Interrupted";
            }
        }).collect(Collectors.toList());
        System.out.println("Completed in " + (System.currentTimeMillis() - start) + " " + results);
        executorService.shutdown();
        executorService.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("Done in " + (System.currentTimeMillis() - start));
    }
}

我用CompletableFuture给出一个模拟例子 我创建了一个TaskClassTaskT,让它看起来更真实。而TaskClass包含三个staterunTask()方法和cancel()方法。

  • 三种状态是SuccessCancellingCancelled。成功表示任务运行 completed.Cancelling表示任务正在取消。 Canceled表示任务已经取消,建议其他人赶紧取消任务。
  • runTask() method:我用Thread.sleep(interval)来模拟运行ing状态。而当任务运行完成后,代码if(cancelled) return Result.CANCELLED;检测状态是否被取消?
  • cancel() method:我用double-check lock来装饰“取消”逻辑,确保它是单实例的。

最后,在主要方法中,我使用 CompletableFuture Class 启动线程和 return 参数。 thenAccept() 方法很有用,帮助 SUCCESS TASK 建议其他 运行 TASK 取消任务。

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class Competable {

    // three state after run task
    static enum Result{
        SUCCESS, FAIL, CANCELLED
    }

    // task list
    static List<TaskT> tasks = new ArrayList<>();

    /**
     * Task List
     */
    public static class TaskT{
        private String name;
        private int timeInSecond;
        private Result ret;

        volatile boolean cancelling = false;
        volatile boolean cancelled = false;

        public TaskT(String name, int timeInSecond, Result ret){
            this.name = name;
            this.timeInSecond = timeInSecond * 1000;
            this.ret = ret;
        }

        /**
         * Simulate task runing
         * runing time in real work is uncertain
         * maybe run in computing,maybe run in IO
         */
        public Result runTask(){
            int interval = 100;
            int total = 0;
            try {
                for(;;){
                    Thread.sleep(interval);
                    total += interval;
                    if(total>=timeInSecond) break;
                    if(cancelled) return Result.CANCELLED;

                }
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            System.out.println(name + "Task End!!!");

            return ret;
        }

        /**
         * Simlulate task cancel
         * and set cancel time
         */
        public void cancel() {
            if (!cancelled) {
                synchronized (this) {
                    if (cancelled) return;
                    cancelling = true;
                    System.out.println(name + "cancelling!!!");
                    try {
                        TimeUnit.MILLISECONDS.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(name + "cancelled!!!");
                }
                cancelled = true;
            }
        }

    }

    /**
     * rollback: advice the other thread cancel
     */
    public static void callback(Result result, TaskT task){
        if(Result.FAIL == result){
            for(TaskT _task : tasks){
                if(_task!=task){
                    _task.cancel();
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        TaskT subtask1 = new TaskT("task1", 3, Result.SUCCESS);
        TaskT subtask2 = new TaskT("task2", 4, Result.SUCCESS);
        TaskT subtask3 = new TaskT("task3", 1, Result.FAIL);

        tasks.add(subtask1);
        tasks.add(subtask2);
        tasks.add(subtask3);

        for(TaskT task:tasks){
            CompletableFuture f = CompletableFuture.supplyAsync(()->task.runTask())
                    .thenAccept((result -> callback(result, task)));
        }

        // System.in.read();
    }
}