自定义多线程:限制某种类型的任务并行执行的数量,不限制其他类型的任务
Customized multithreading: limiting number of tasks of some type to be executed in parallel, without limiting tasks of other types
我有 3 种类型的任务:A, B, C
。
而且我想 运行 在 N 个线程中并行执行这些任务。假设任务列表如下:
A, B, C, B, C, A, B, C
当然,我可以使用ExecutorService
实现多线程执行,但问题是我一次最多需要执行一个C
类型的任务。 类型 C
的其他任务必须按顺序执行,但与任务 A
and/or B
.
并行执行
例如3线程执行器可能处于以下任何状态:
A B C
A A A
A C B
B B C
B B B
B C
A C
A B
C
...
(允许同时执行多个A类或B类任务,但最多只能同时执行一个C类任务)
在Java中有什么方法可以做到这一点吗?
更新
这是我想出的方法这是正确的方法吗
在这里,我通过 ExecutorService
执行所有任务,在执行时我会检查是否有任何其他 C 任务是 Running.If 不是我会执行,否则我会将它添加到队列中,该队列将在任何其他任务的成功完成并且我还检查任何 C 任务是 运行ning 或 Not
public class Test {
public void startExecution() {
Queue<String> runQ = new LinkedList<>();
ThreadPool exec = (ThreadPool) Executors.newFixedThreadPool(RunSettings.getRunSettings().getThreadCount());
while (!runQ.isEmpty() && !SystemDefaults.stopExecution.get()) {
String TaskName = runQ.remove();
Task t = new Task(TaskName);
exec.execute(t, TaskName);
}
exec.shutdown();
if (exec.awaitTermination(RunSettings.getRunSettings().getExecutionTimeOut(), TimeUnit.MINUTES)) {
System.out.println("[CONTROL: ALL TEST TASKS COMPLETED SUCCESSFULLY.]");
} else {
System.out.println("[CONTROL: ALL THE TEST TASKS DID NOT COMPLETE SUCCESSFULLY IN STIPULATED TIME. FORCEFULLY FINALIZING.]");
exec.shutdownNow();
}
}
}
我创建的ThreadPool
public class ThreadPool extends ThreadPoolExecutor {
public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
final String CTask = "TaskC";
Map<Runnable, String> TaskPool = new HashMap<>();
Queue<Runnable> TaskCList = new LinkedList<>();
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (TaskPool.containsKey(r)) {
TaskPool.remove(r);
}
if (!TaskPool.containsValue(CTask) && !TaskCList.isEmpty()) {
Runnable ieRun = TaskCList.remove();
super.execute(ieRun);
TaskPool.put(ieRun, CTask);
}
}
public void execute(Runnable command, String TaskType) {
if (TaskPool.containsValue(TaskType)
&& TaskType.equalsIgnoreCase(CTask)) {
System.out.println("Another Instance of " + CTask + " Running");
TaskCList.add(command);
} else {
super.execute(command);
TaskPool.put(command, TaskType);
}
}
}
最简单的方法是创建 2 个执行器:一个单线程用于 C 类型的任务,另一个多线程用于其他类型的任务:
class ExecutorWrapper {
private ExecutorService forC = Executors.newSingleThreadExecutor();
private ExecutorService forAnother = Executors.newFixedThreadPool(THREAD_NUMBER);
public void acceptTask(Runnable r) {
if (r instanceof TaskC) {
forC.execute(r);
} else {
forAnother.execute(r);
}
}
}
现在,任何类型 C 的任务都将在 forC
执行器内部队列中等待,直到另一个此类任务完成。
如果您不希望创建另一个执行程序,则需要实施某种并发控制,这要复杂得多,并且由于可能会出现竞争条件而难以调试。我可以提出解决方案草案,但没有代码:
- 创建一个标志以指示另一个任务 C 是否已在执行,以及另一个任务 C 将等待的队列
- 当类型C的任务到达时,检查是否有另一个任务C正在执行,如果是,则将其加入上述队列
- 在任务 C 完成时,发送某种任务 C 已完成的通知——以便从提到的队列中取出下一个任务 C 并将其发送到执行器。如果队列为空,则清除标志以指示现在没有任务 C 正在执行。这种通知可以通过用
Callable
包装任务 C 并调用 Future#get
方法来实现,该方法将阻塞直到任务完成。
看起来你需要做的就是制作任务 C synchronized
。这里有一些测试代码似乎证明这就足够了——尽管没有失败并不总是意味着成功。
结果清楚地显示了 A 和 B 的 运行 是平行的,但 C 没有。
static enum Task implements Callable<Void> {
A,
B,
C {
@Override
public synchronized Void call() throws Exception {
if (running.get(this).get() != 0) {
System.out.println("FAIL!");
}
return super.call();
}
};
// How many of each are running.
static Map<Task, AtomicInteger> running = Stream.of(Task.values())
.collect(
Collectors.toMap(
(t) -> t,
(t) -> new AtomicInteger(0),
(x, y) -> x,
() -> new EnumMap<Task, AtomicInteger>(Task.class)));
// List all running tasks.
private String runningList() {
StringBuilder s = new StringBuilder();
running.entrySet().stream().forEach((r) -> {
if (r.getValue().get() != 0) {
s.append(r.getKey()).append("=").append(r.getValue()).append(",");
}
});
return s.toString();
}
static final Random random = new Random();
@Override
public Void call() throws Exception {
System.out.println("Running " + name() + " with " + runningList());
// Mark me running.
running.get(this).getAndIncrement();
// Hang around for a bit.
Thread.sleep(random.nextInt(1000));
// Mark me not running.
running.get(this).getAndDecrement();
return null;
}
}
// The pool.
static ExecutorService pool = Executors.newFixedThreadPool(5);
// The tasks.
static Task[] tasks = new Task[]{Task.A, Task.B, Task.C, Task.B, Task.C, Task.A, Task.B, Task.C,};
public void test() throws InterruptedException {
// Run 10 times.
for (int i = 0; i < 10; i++) {
pool.invokeAll(Arrays.asList(tasks));
}
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
}
感谢大家 support.Anyhow 我想出了一个目前运行良好的解决方案
public class Test {
public void startExecution() {
Queue<String> runQ = new LinkedList<>();
ThreadPool threadPool = new ThreadPool(threadCount,timeOut);
while (!runQ.isEmpty()) {
String TaskName = runQ.remove();
Task t = new Task(TaskName);
threadPool.execute(t, TaskName);
}
if (threadPool.awaitTermination(timeOut, TimeUnit.MINUTES)) {
System.out.println("[CONTROL: ALL TEST TASKS COMPLETED SUCCESSFULLY.]");
} else {
System.out.println("[CONTROL: ALL THE TEST TASKS DID NOT COMPLETE SUCCESSFULLY IN STIPULATED TIME. FORCEFULLY FINALIZING.]");
threadPool.shutdownNow();
}
}
}
线程池实现
public class ThreadPool extends ThreadPoolExecutor {
public ThreadPool(int threadCount, long keepAliveTime) {
super(threadCount, threadCount, keepAliveTime, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
}
final String CTask = "TaskC";
Map<Runnable, String> TaskPool = new HashMap<>();
Queue<Runnable> TaskCList = new LinkedList<>();
@Override
protected synchronized void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
System.out.println(TaskPool.get(r) + "Finished");
if (TaskPool.containsKey(r)) {
TaskPool.remove(r);
}
if (TaskCList.isEmpty()) {
super.shutdown();
}
if (!TaskPool.containsValue(CTask) && !TaskCList.isEmpty()) {
if (super.getActiveCount() < super.getCorePoolSize()) {
System.out.println("Trying to execute Other C Tasks");
Runnable ieRun = TaskCList.remove();
super.execute(ieRun);
TaskPool.put(ieRun, CTask);
}
}
}
public synchronized void execute(Runnable command, String TaskType) {
if (TaskPool.containsValue(TaskType)
&& TaskType.equalsIgnoreCase(CTask)) {
System.out.println("Another Instance of TaskC Running");
System.out.println("Added for future Execution");
TaskCList.add(command);
} else {
System.out.println("Adding " + TaskType + " to execution");
TaskPool.put(command, TaskType);
super.execute(command);
}
}
我有 3 种类型的任务:A, B, C
。
而且我想 运行 在 N 个线程中并行执行这些任务。假设任务列表如下:
A, B, C, B, C, A, B, C
当然,我可以使用ExecutorService
实现多线程执行,但问题是我一次最多需要执行一个C
类型的任务。 类型 C
的其他任务必须按顺序执行,但与任务 A
and/or B
.
例如3线程执行器可能处于以下任何状态:
A B C
A A A
A C B
B B C
B B B
B C
A C
A B
C
...
(允许同时执行多个A类或B类任务,但最多只能同时执行一个C类任务)
在Java中有什么方法可以做到这一点吗?
更新
这是我想出的方法这是正确的方法吗
在这里,我通过 ExecutorService
执行所有任务,在执行时我会检查是否有任何其他 C 任务是 Running.If 不是我会执行,否则我会将它添加到队列中,该队列将在任何其他任务的成功完成并且我还检查任何 C 任务是 运行ning 或 Not
public class Test {
public void startExecution() {
Queue<String> runQ = new LinkedList<>();
ThreadPool exec = (ThreadPool) Executors.newFixedThreadPool(RunSettings.getRunSettings().getThreadCount());
while (!runQ.isEmpty() && !SystemDefaults.stopExecution.get()) {
String TaskName = runQ.remove();
Task t = new Task(TaskName);
exec.execute(t, TaskName);
}
exec.shutdown();
if (exec.awaitTermination(RunSettings.getRunSettings().getExecutionTimeOut(), TimeUnit.MINUTES)) {
System.out.println("[CONTROL: ALL TEST TASKS COMPLETED SUCCESSFULLY.]");
} else {
System.out.println("[CONTROL: ALL THE TEST TASKS DID NOT COMPLETE SUCCESSFULLY IN STIPULATED TIME. FORCEFULLY FINALIZING.]");
exec.shutdownNow();
}
}
}
我创建的ThreadPool
public class ThreadPool extends ThreadPoolExecutor {
public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
final String CTask = "TaskC";
Map<Runnable, String> TaskPool = new HashMap<>();
Queue<Runnable> TaskCList = new LinkedList<>();
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (TaskPool.containsKey(r)) {
TaskPool.remove(r);
}
if (!TaskPool.containsValue(CTask) && !TaskCList.isEmpty()) {
Runnable ieRun = TaskCList.remove();
super.execute(ieRun);
TaskPool.put(ieRun, CTask);
}
}
public void execute(Runnable command, String TaskType) {
if (TaskPool.containsValue(TaskType)
&& TaskType.equalsIgnoreCase(CTask)) {
System.out.println("Another Instance of " + CTask + " Running");
TaskCList.add(command);
} else {
super.execute(command);
TaskPool.put(command, TaskType);
}
}
}
最简单的方法是创建 2 个执行器:一个单线程用于 C 类型的任务,另一个多线程用于其他类型的任务:
class ExecutorWrapper {
private ExecutorService forC = Executors.newSingleThreadExecutor();
private ExecutorService forAnother = Executors.newFixedThreadPool(THREAD_NUMBER);
public void acceptTask(Runnable r) {
if (r instanceof TaskC) {
forC.execute(r);
} else {
forAnother.execute(r);
}
}
}
现在,任何类型 C 的任务都将在 forC
执行器内部队列中等待,直到另一个此类任务完成。
如果您不希望创建另一个执行程序,则需要实施某种并发控制,这要复杂得多,并且由于可能会出现竞争条件而难以调试。我可以提出解决方案草案,但没有代码:
- 创建一个标志以指示另一个任务 C 是否已在执行,以及另一个任务 C 将等待的队列
- 当类型C的任务到达时,检查是否有另一个任务C正在执行,如果是,则将其加入上述队列
- 在任务 C 完成时,发送某种任务 C 已完成的通知——以便从提到的队列中取出下一个任务 C 并将其发送到执行器。如果队列为空,则清除标志以指示现在没有任务 C 正在执行。这种通知可以通过用
Callable
包装任务 C 并调用Future#get
方法来实现,该方法将阻塞直到任务完成。
看起来你需要做的就是制作任务 C synchronized
。这里有一些测试代码似乎证明这就足够了——尽管没有失败并不总是意味着成功。
结果清楚地显示了 A 和 B 的 运行 是平行的,但 C 没有。
static enum Task implements Callable<Void> {
A,
B,
C {
@Override
public synchronized Void call() throws Exception {
if (running.get(this).get() != 0) {
System.out.println("FAIL!");
}
return super.call();
}
};
// How many of each are running.
static Map<Task, AtomicInteger> running = Stream.of(Task.values())
.collect(
Collectors.toMap(
(t) -> t,
(t) -> new AtomicInteger(0),
(x, y) -> x,
() -> new EnumMap<Task, AtomicInteger>(Task.class)));
// List all running tasks.
private String runningList() {
StringBuilder s = new StringBuilder();
running.entrySet().stream().forEach((r) -> {
if (r.getValue().get() != 0) {
s.append(r.getKey()).append("=").append(r.getValue()).append(",");
}
});
return s.toString();
}
static final Random random = new Random();
@Override
public Void call() throws Exception {
System.out.println("Running " + name() + " with " + runningList());
// Mark me running.
running.get(this).getAndIncrement();
// Hang around for a bit.
Thread.sleep(random.nextInt(1000));
// Mark me not running.
running.get(this).getAndDecrement();
return null;
}
}
// The pool.
static ExecutorService pool = Executors.newFixedThreadPool(5);
// The tasks.
static Task[] tasks = new Task[]{Task.A, Task.B, Task.C, Task.B, Task.C, Task.A, Task.B, Task.C,};
public void test() throws InterruptedException {
// Run 10 times.
for (int i = 0; i < 10; i++) {
pool.invokeAll(Arrays.asList(tasks));
}
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
}
感谢大家 support.Anyhow 我想出了一个目前运行良好的解决方案
public class Test {
public void startExecution() {
Queue<String> runQ = new LinkedList<>();
ThreadPool threadPool = new ThreadPool(threadCount,timeOut);
while (!runQ.isEmpty()) {
String TaskName = runQ.remove();
Task t = new Task(TaskName);
threadPool.execute(t, TaskName);
}
if (threadPool.awaitTermination(timeOut, TimeUnit.MINUTES)) {
System.out.println("[CONTROL: ALL TEST TASKS COMPLETED SUCCESSFULLY.]");
} else {
System.out.println("[CONTROL: ALL THE TEST TASKS DID NOT COMPLETE SUCCESSFULLY IN STIPULATED TIME. FORCEFULLY FINALIZING.]");
threadPool.shutdownNow();
}
}
}
线程池实现
public class ThreadPool extends ThreadPoolExecutor {
public ThreadPool(int threadCount, long keepAliveTime) {
super(threadCount, threadCount, keepAliveTime, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
}
final String CTask = "TaskC";
Map<Runnable, String> TaskPool = new HashMap<>();
Queue<Runnable> TaskCList = new LinkedList<>();
@Override
protected synchronized void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
System.out.println(TaskPool.get(r) + "Finished");
if (TaskPool.containsKey(r)) {
TaskPool.remove(r);
}
if (TaskCList.isEmpty()) {
super.shutdown();
}
if (!TaskPool.containsValue(CTask) && !TaskCList.isEmpty()) {
if (super.getActiveCount() < super.getCorePoolSize()) {
System.out.println("Trying to execute Other C Tasks");
Runnable ieRun = TaskCList.remove();
super.execute(ieRun);
TaskPool.put(ieRun, CTask);
}
}
}
public synchronized void execute(Runnable command, String TaskType) {
if (TaskPool.containsValue(TaskType)
&& TaskType.equalsIgnoreCase(CTask)) {
System.out.println("Another Instance of TaskC Running");
System.out.println("Added for future Execution");
TaskCList.add(command);
} else {
System.out.println("Adding " + TaskType + " to execution");
TaskPool.put(command, TaskType);
super.execute(command);
}
}