如何使用 ExecutorService 以具体顺序执行线程?
How to execute threads with ExecutorService in concrete order?
在我的程序中我有几个命令,它们由命令执行器class执行。我需要使用 ExecutorService 依次执行 4 个命令(在创建新命令之前不向用户显示)。
执行环境:
public class ConcurrentCommandExecutionEnvironment {
private static final int POOL_SIZE = 4;
private static final Logger log = Logger.getLogger(ConcurrentCommandExecutionEnvironment.class);
public void readArgsAndExecuteCommand(String[] props) {
if (props.length == 0) {
throw new IllegalArgumentException("Error: no params entered");
}
ExecutorService execService = new ThreadPoolExecutor(
4,
4,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
ReentrantLock lock = new ReentrantLock();
CommandStore commandStore = new CommandStore();
CommandExecutor commandExecutor = new CommandExecutor(
new CreateUserCommand(commandStore),
new GetUserListCommand(commandStore),
new CreateTaskCommand(commandStore),
new GetTasksListCommand(commandStore),
new GetTaskByUsernameCommand(commandStore),
new CompleteTaskCommand(commandStore),
new DeleteUserCommand(commandStore),
new CreateUserAndTaskCommand(commandStore)
);
execService.execute(() -> {
commandExecutor.createUserAndTask(props);
});
execService.execute(() -> {
commandExecutor.getUsers(props);
});
execService.execute(() -> {
commandExecutor.getTasks(props);
});
execService.shutdown();
}
以前我没有使用 ExecutorService 和使用“同步”运算符的同步线程。
我可以在这里使用它吗(使用 commandExecutor 实例作为互斥量并在每个线程中同步它,如下例所示):
execService.execute(() -> {
synchronized (commandExecutor) {
commandExecutor.createUserAndTask(props);
}
});
或者对于 ExecutorService 我应该以其他方式继续?
Executors.newSingleThreadExecutor
如果您需要多个任务按顺序 运行,请将它们提交到 single-threaded executor service。
ExecutorService es = Executors.newSingleThreadExecutor() ;
…
es.submit( task1 ) ;
es.submit( task2 ) ;
es.submit( task3 ) ;
或者重新考虑您是否需要执行程序服务。如果原始线程在单线程执行程序服务中依次等待 运行 一系列任务,则原始线程也可以 运行 任务本身。如果在单个线程上等待,则线程化没有意义。
如果您对使用 Executors.newSingleThreadExecutor()
甚至根本不使用 ExecutorService
的自然顺序执行不感兴趣 :)...那么让我们看一下 [= 的简化说明14=] 是 (Executors.newFixedThreadPool(4)
):
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
final int nThreads = 4;
for (int i = 0; i < nThreads; i++) {
new Thread(() -> {
while (true) {
try {
tasks.take().run();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Throwable t) {
t.printStackTrace();
}
}
}).start();
}
tasks.offer(() -> System.out.println("Task 1"));
tasks.offer(() -> System.out.println("Task 2"));
tasks.offer(() -> System.out.println("Task 3"));
如您所见,inter-task/thread synchronization/communication 没有开箱即用的工具。我们不能直接管理线程,但是我们可以管理我们的任务,这会影响线程的执行。例如:
- 上一个任务完成后提交下一个任务。这是一个很自然的解决方案,有时它非常有用(特别是对于计划执行)。您 submit/schedule 一个任务,直到条件为真。按照你的逻辑,检查条件,如果条件为真,submit/schedule 下一个任务。这自然保证了你一次只有一个任务运行(使用任何类型的执行器)。这是如何完成的示例:
public class CommandChain implements Runnable {
public static CommandChain start(final ExecutorService executor, final Runnable command) {
return new CommandChain(executor, command);
}
private final ExecutorService executor;
private final Runnable command;
private CommandChain then;
private CommandChain(final ExecutorService executor, final Runnable command) {
this.executor = executor;
this.command = command;
}
public CommandChain then(final Runnable command) {
then = new CommandChain(this.executor, command);
return then;
}
@Override
public void run() {
command.run();
if (then != null) {
executor.submit(then);
}
}
}
final ExecutorService executor = Executors.newFixedThreadPool(4);
final CommandChain command1 = CommandChain.start(executor, () -> System.out.println("Command 1"));
command1.then(() -> System.out.println("Command 2"))
.then(() -> System.out.println("Command 3"));
executor.submit(command1);
这个技巧适用于任何类型的 ExecutorService,包括使用任何池大小或只是 Executors.newSingleThreadExecutor()
- 在一个任务中,等到前一个任务完成,并在这个特定任务完成时通知下一个任务。这种 1 对 1 waiting/notification 最方便的方法之一是使用
CountDownLatch
:
public class CommandChain implements Runnable {
public static CommandChain start(final Runnable command) {
return new CommandChain(null, command);
}
private final CountDownLatch waitLatch;
private final Runnable command;
private CommandChain then;
private CommandChain(final CountDownLatch waitLatch, final Runnable command) {
this.waitLatch = waitLatch;
this.command = command;
}
public CommandChain then(final Runnable command) {
then = new CommandChain(new CountDownLatch(1), command);
return then;
}
@Override
public void run() {
if (waitLatch != null) {
try {
waitLatch.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return; // we are here, because
// ExecutorService is stopping with
// interruption of its workers, so
// let's finish the execution
}
}
command.run();
if (then != null) {
then.waitLatch.countDown();
}
}
}
final ExecutorService executor = Executors.newFixedThreadPool(4);
final CommandChain command1 = CommandChain.start(() -> System.out.println("Command 1"));
final CommandChain command2 = command1.then(() -> System.out.println("Command 2"));
final CommandChain command3 = command2.then(() -> System.out.println("Command 3"));
// the order doesn't matter
executor.submit(command3);
executor.submit(command1);
executor.submit(command2);
此解决方案的缺点是,在最坏的情况下,您应该有足够的线程来执行所有任务 blocking/awaiting - 从链的末端到开始 - 当最后一个命令调用 waitLatch.await()
首先,最后一个命令之前的命令调用它的 await 等等......这意味着 n(umber)Threads >= n(umber)Tasks。否则,执行可以停止,只需尝试 Executors.newFixedThreadPool(1)
本例。
我们还可以使用相同的共享 CountDownLatch
es 或 CyclicBarrier
s,Phaser
等
在我的程序中我有几个命令,它们由命令执行器class执行。我需要使用 ExecutorService 依次执行 4 个命令(在创建新命令之前不向用户显示)。
执行环境:
public class ConcurrentCommandExecutionEnvironment {
private static final int POOL_SIZE = 4;
private static final Logger log = Logger.getLogger(ConcurrentCommandExecutionEnvironment.class);
public void readArgsAndExecuteCommand(String[] props) {
if (props.length == 0) {
throw new IllegalArgumentException("Error: no params entered");
}
ExecutorService execService = new ThreadPoolExecutor(
4,
4,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
ReentrantLock lock = new ReentrantLock();
CommandStore commandStore = new CommandStore();
CommandExecutor commandExecutor = new CommandExecutor(
new CreateUserCommand(commandStore),
new GetUserListCommand(commandStore),
new CreateTaskCommand(commandStore),
new GetTasksListCommand(commandStore),
new GetTaskByUsernameCommand(commandStore),
new CompleteTaskCommand(commandStore),
new DeleteUserCommand(commandStore),
new CreateUserAndTaskCommand(commandStore)
);
execService.execute(() -> {
commandExecutor.createUserAndTask(props);
});
execService.execute(() -> {
commandExecutor.getUsers(props);
});
execService.execute(() -> {
commandExecutor.getTasks(props);
});
execService.shutdown();
}
以前我没有使用 ExecutorService 和使用“同步”运算符的同步线程。 我可以在这里使用它吗(使用 commandExecutor 实例作为互斥量并在每个线程中同步它,如下例所示):
execService.execute(() -> {
synchronized (commandExecutor) {
commandExecutor.createUserAndTask(props);
}
});
或者对于 ExecutorService 我应该以其他方式继续?
Executors.newSingleThreadExecutor
如果您需要多个任务按顺序 运行,请将它们提交到 single-threaded executor service。
ExecutorService es = Executors.newSingleThreadExecutor() ;
…
es.submit( task1 ) ;
es.submit( task2 ) ;
es.submit( task3 ) ;
或者重新考虑您是否需要执行程序服务。如果原始线程在单线程执行程序服务中依次等待 运行 一系列任务,则原始线程也可以 运行 任务本身。如果在单个线程上等待,则线程化没有意义。
如果您对使用 Executors.newSingleThreadExecutor()
甚至根本不使用 ExecutorService
的自然顺序执行不感兴趣 :)...那么让我们看一下 [= 的简化说明14=] 是 (Executors.newFixedThreadPool(4)
):
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
final int nThreads = 4;
for (int i = 0; i < nThreads; i++) {
new Thread(() -> {
while (true) {
try {
tasks.take().run();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Throwable t) {
t.printStackTrace();
}
}
}).start();
}
tasks.offer(() -> System.out.println("Task 1"));
tasks.offer(() -> System.out.println("Task 2"));
tasks.offer(() -> System.out.println("Task 3"));
如您所见,inter-task/thread synchronization/communication 没有开箱即用的工具。我们不能直接管理线程,但是我们可以管理我们的任务,这会影响线程的执行。例如:
- 上一个任务完成后提交下一个任务。这是一个很自然的解决方案,有时它非常有用(特别是对于计划执行)。您 submit/schedule 一个任务,直到条件为真。按照你的逻辑,检查条件,如果条件为真,submit/schedule 下一个任务。这自然保证了你一次只有一个任务运行(使用任何类型的执行器)。这是如何完成的示例:
public class CommandChain implements Runnable {
public static CommandChain start(final ExecutorService executor, final Runnable command) {
return new CommandChain(executor, command);
}
private final ExecutorService executor;
private final Runnable command;
private CommandChain then;
private CommandChain(final ExecutorService executor, final Runnable command) {
this.executor = executor;
this.command = command;
}
public CommandChain then(final Runnable command) {
then = new CommandChain(this.executor, command);
return then;
}
@Override
public void run() {
command.run();
if (then != null) {
executor.submit(then);
}
}
}
final ExecutorService executor = Executors.newFixedThreadPool(4);
final CommandChain command1 = CommandChain.start(executor, () -> System.out.println("Command 1"));
command1.then(() -> System.out.println("Command 2"))
.then(() -> System.out.println("Command 3"));
executor.submit(command1);
这个技巧适用于任何类型的 ExecutorService,包括使用任何池大小或只是 Executors.newSingleThreadExecutor()
- 在一个任务中,等到前一个任务完成,并在这个特定任务完成时通知下一个任务。这种 1 对 1 waiting/notification 最方便的方法之一是使用
CountDownLatch
:
public class CommandChain implements Runnable {
public static CommandChain start(final Runnable command) {
return new CommandChain(null, command);
}
private final CountDownLatch waitLatch;
private final Runnable command;
private CommandChain then;
private CommandChain(final CountDownLatch waitLatch, final Runnable command) {
this.waitLatch = waitLatch;
this.command = command;
}
public CommandChain then(final Runnable command) {
then = new CommandChain(new CountDownLatch(1), command);
return then;
}
@Override
public void run() {
if (waitLatch != null) {
try {
waitLatch.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return; // we are here, because
// ExecutorService is stopping with
// interruption of its workers, so
// let's finish the execution
}
}
command.run();
if (then != null) {
then.waitLatch.countDown();
}
}
}
final ExecutorService executor = Executors.newFixedThreadPool(4);
final CommandChain command1 = CommandChain.start(() -> System.out.println("Command 1"));
final CommandChain command2 = command1.then(() -> System.out.println("Command 2"));
final CommandChain command3 = command2.then(() -> System.out.println("Command 3"));
// the order doesn't matter
executor.submit(command3);
executor.submit(command1);
executor.submit(command2);
此解决方案的缺点是,在最坏的情况下,您应该有足够的线程来执行所有任务 blocking/awaiting - 从链的末端到开始 - 当最后一个命令调用 waitLatch.await()
首先,最后一个命令之前的命令调用它的 await 等等......这意味着 n(umber)Threads >= n(umber)Tasks。否则,执行可以停止,只需尝试 Executors.newFixedThreadPool(1)
本例。
我们还可以使用相同的共享 CountDownLatch
es 或 CyclicBarrier
s,Phaser
等