如何使用 ExecutorService 以具体顺序执行线程?

How to execute threads with ExecutorService in concrete order?

在我的程序中我有几个命令,它们由命令执行器class执行。我需要使用 ExecutorService 依次执行 4 个命令(在创建新命令之前不向用户显示)。

执行环境:

public class ConcurrentCommandExecutionEnvironment {
    private static final int POOL_SIZE = 4;

    private static final Logger log = Logger.getLogger(ConcurrentCommandExecutionEnvironment.class);

    public void readArgsAndExecuteCommand(String[] props) {
        if (props.length == 0) {
            throw new IllegalArgumentException("Error: no params entered");
        }

        ExecutorService execService = new ThreadPoolExecutor(
                4,
                4,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()
        );
        ReentrantLock lock = new ReentrantLock();

        CommandStore commandStore = new CommandStore();
        CommandExecutor commandExecutor = new CommandExecutor(
                new CreateUserCommand(commandStore),
                new GetUserListCommand(commandStore),
                new CreateTaskCommand(commandStore),
                new GetTasksListCommand(commandStore),
                new GetTaskByUsernameCommand(commandStore),
                new CompleteTaskCommand(commandStore),
                new DeleteUserCommand(commandStore),
                new CreateUserAndTaskCommand(commandStore)
        );

        execService.execute(() -> {                
                commandExecutor.createUserAndTask(props);           
        });

        execService.execute(() -> {               
                commandExecutor.getUsers(props);
        });

        execService.execute(() -> {                
                commandExecutor.getTasks(props);           
        });

        execService.shutdown();
}

以前我没有使用 ExecutorService 和使用“同步”运算符的同步线程。 我可以在这里使用它吗(使用 commandExecutor 实例作为互斥量并在每个线程中同步它,如下例所示):

execService.execute(() -> {
            synchronized (commandExecutor) {
                commandExecutor.createUserAndTask(props);
      }
  });

或者对于 ExecutorService 我应该以其他方式继续?

Executors.newSingleThreadExecutor

如果您需要多个任务按顺序 运行,请将它们提交到 single-threaded executor service

ExecutorService es = Executors.newSingleThreadExecutor() ;
…
es.submit( task1 ) ;
es.submit( task2 ) ;
es.submit( task3 ) ;

或者重新考虑您是否需要执行程序服务。如果原始线程在单线程执行程序服务中依次等待 运行 一系列任务,则原始线程也可以 运行 任务本身。如果在单个线程上等待,则线程化没有意义。

如果您对使用 Executors.newSingleThreadExecutor() 甚至根本不使用 ExecutorService 的自然顺序执行不感兴趣 :)...那么让我们看一下 [= 的简化说明14=] 是 (Executors.newFixedThreadPool(4)):

final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

final int nThreads = 4;
for (int i = 0; i < nThreads; i++) {
    new Thread(() -> {
        while (true) {
            try {
                tasks.take().run();
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }).start();
}

tasks.offer(() -> System.out.println("Task 1"));
tasks.offer(() -> System.out.println("Task 2"));
tasks.offer(() -> System.out.println("Task 3"));

如您所见,inter-task/thread synchronization/communication 没有开箱即用的工具。我们不能直接管理线程,但是我们可以管理我们的任务,这会影响线程的执行。例如:

  1. 上一个任务完成后提交下一个任务。这是一个很自然的解决方案,有时它非常有用(特别是对于计划执行)。您 submit/schedule 一个任务,直到条件为真。按照你的逻辑,检查条件,如果条件为真,submit/schedule 下一个任务。这自然保证了你一次只有一个任务运行(使用任何类型的执行器)。这是如何完成的示例:
    public class CommandChain implements Runnable {
        public static CommandChain start(final ExecutorService executor, final Runnable command) {
            return new CommandChain(executor, command);
        }

        private final ExecutorService executor;
        private final Runnable command;
        private CommandChain then;

        private CommandChain(final ExecutorService executor, final Runnable command) {
            this.executor = executor;
            this.command = command;
        }

        public CommandChain then(final Runnable command) {
            then = new CommandChain(this.executor, command);
            return then;
        }

        @Override
        public void run() {
            command.run();
            if (then != null) {
                executor.submit(then);
            }
        }
    }

    final ExecutorService executor = Executors.newFixedThreadPool(4);

    final CommandChain command1 = CommandChain.start(executor, () -> System.out.println("Command 1"));
    command1.then(() -> System.out.println("Command 2"))
            .then(() -> System.out.println("Command 3"));
    executor.submit(command1);

这个技巧适用于任何类型的 ExecutorService,包括使用任何池大小或只是 Executors.newSingleThreadExecutor()

  1. 在一个任务中,等到前一个任务完成,并在这个特定任务完成时通知下一个任务。这种 1 对 1 waiting/notification 最方便的方法之一是使用 CountDownLatch:
    public class CommandChain implements Runnable {
        public static CommandChain start(final Runnable command) {
            return new CommandChain(null, command);
        }

        private final CountDownLatch waitLatch;
        private final Runnable command;

        private CommandChain then;

        private CommandChain(final CountDownLatch waitLatch, final Runnable command) {
            this.waitLatch = waitLatch;
            this.command = command;
        }

        public CommandChain then(final Runnable command) {
            then = new CommandChain(new CountDownLatch(1), command);
            return then;
        }

        @Override
        public void run() {
            if (waitLatch != null) {
                try {
                    waitLatch.await();
                } catch (final InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return; // we are here, because 
                    // ExecutorService is stopping with
                    // interruption of its workers, so
                    // let's finish the execution
                }
            }
            command.run();
            if (then != null) {
                then.waitLatch.countDown();
            }
        }
    }

    final ExecutorService executor = Executors.newFixedThreadPool(4);

    final CommandChain command1 = CommandChain.start(() -> System.out.println("Command 1"));
    final CommandChain command2 = command1.then(() -> System.out.println("Command 2"));
    final CommandChain command3 = command2.then(() -> System.out.println("Command 3"));

    // the order doesn't matter
    executor.submit(command3);
    executor.submit(command1);
    executor.submit(command2);

此解决方案的缺点是,在最坏的情况下,您应该有足够的线程来执行所有任务 blocking/awaiting - 从链的末端到开始 - 当最后一个命令调用 waitLatch.await()首先,最后一个命令之前的命令调用它的 await 等等......这意味着 n(umber)Threads >= n(umber)Tasks。否则,执行可以停止,只需尝试 Executors.newFixedThreadPool(1) 本例。

我们还可以使用相同的共享 CountDownLatches 或 CyclicBarriers,Phaser