ThreadPoolExecutor 的线程是否不与 PriorityBlockingQueue 同时运行
Does threads of ThreadPoolExecutor not runs concurrently using with PriorityBlockingQueue
我正在使用 java ThreadPoolExecutor 来 运行 并发线程执行。我使用 ArrayBlockingQueue 将线程保持在队列中。但现在要求已经改变,我需要添加线程 运行 时间(无大小限制),它应该被优先考虑。
所以我决定使用 PriorityBlockingQueue 而不是 ArrayBlockingQueue 和一些比较逻辑。
使用 PriorityBlockingQueue 后,线程 运行 一个接一个地顺序排列,而不是并发。一次只有一个线程 运行,而不是任何活动线程数。
如果有人有任何建议来解决这个问题并达到我的要求,请告诉我(线程应该在 运行 时间添加到池中,并且它的执行应该基于优先级)。
我的演示代码:
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
BlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(50, ThreadComparator.getComparator());
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(1, activeThread, 10, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
//start the monitoring thread
MyMonitorThread monitor = new MyMonitorThread(executorPool, 20, "Demo");
Thread monitorThread = new Thread(monitor);
monitorThread.start();
for (int i = 0; i < totalThead; i++) {
int prio = i % 3 == 0 ? 3 : 5;
executorPool.execute(new MyThread("Thread-" + i, prio));
}
// Inserting more threads in between concurrent execution.
try {
Thread.sleep(40000);
for (int j = 101; j < 110; j++) {
executorPool.execute(new MyThread("Thread-" + j, 2));
}
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
while(executorPool.getActiveCount() != 0) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Error while thread sleeping: " + e);
}
}
//shut down the pool
executorPool.shutdown();
//shut down the monitor thread
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println("Error while thread sleeping: " + e);
}
monitor.shutdown();
public abstract class ThreadComparator implements Comparator<Runnable>{
public static Comparator<Runnable> getComparator() {
return new Comparator<Runnable>() {
@Override
public int compare(Runnable t1, Runnable t2) {
CompareToBuilder compare = new CompareToBuilder();
MyThread mt1 = (MyThread) t1;
MyThread mt2 = (MyThread) t2;
compare.append(mt1.getPriority(), mt2.getPriority());
return compare.toComparison();
}
};
}
}
这是具有无限工作队列的 ThreadPoolExecutor
的预期行为。
引用 ThreadPoolExecutor
JavaDoc:
Core and maximum pool sizes
A ThreadPoolExecutor will automatically adjust the pool size [..].
When a new task is submitted in method execute(Runnable), and fewer
than corePoolSize threads are running, a new thread is created to
handle the request, even if other worker threads are idle. If there
are more than corePoolSize but less than maximumPoolSize threads
running, a new thread will be created only if the queue is full. [...]
由于您将 corePoolSize
定义为 1
并且 PriorityBlockingQueue
本质上是一个无界队列(永远不会变满),因此您永远不会有一个以上的线程。
解决方法是将 corePoolSize
调整为所需的线程数。
我正在使用 java ThreadPoolExecutor 来 运行 并发线程执行。我使用 ArrayBlockingQueue 将线程保持在队列中。但现在要求已经改变,我需要添加线程 运行 时间(无大小限制),它应该被优先考虑。 所以我决定使用 PriorityBlockingQueue 而不是 ArrayBlockingQueue 和一些比较逻辑。 使用 PriorityBlockingQueue 后,线程 运行 一个接一个地顺序排列,而不是并发。一次只有一个线程 运行,而不是任何活动线程数。 如果有人有任何建议来解决这个问题并达到我的要求,请告诉我(线程应该在 运行 时间添加到池中,并且它的执行应该基于优先级)。
我的演示代码:
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
BlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(50, ThreadComparator.getComparator());
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(1, activeThread, 10, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
//start the monitoring thread
MyMonitorThread monitor = new MyMonitorThread(executorPool, 20, "Demo");
Thread monitorThread = new Thread(monitor);
monitorThread.start();
for (int i = 0; i < totalThead; i++) {
int prio = i % 3 == 0 ? 3 : 5;
executorPool.execute(new MyThread("Thread-" + i, prio));
}
// Inserting more threads in between concurrent execution.
try {
Thread.sleep(40000);
for (int j = 101; j < 110; j++) {
executorPool.execute(new MyThread("Thread-" + j, 2));
}
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
while(executorPool.getActiveCount() != 0) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Error while thread sleeping: " + e);
}
}
//shut down the pool
executorPool.shutdown();
//shut down the monitor thread
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println("Error while thread sleeping: " + e);
}
monitor.shutdown();
public abstract class ThreadComparator implements Comparator<Runnable>{
public static Comparator<Runnable> getComparator() {
return new Comparator<Runnable>() {
@Override
public int compare(Runnable t1, Runnable t2) {
CompareToBuilder compare = new CompareToBuilder();
MyThread mt1 = (MyThread) t1;
MyThread mt2 = (MyThread) t2;
compare.append(mt1.getPriority(), mt2.getPriority());
return compare.toComparison();
}
};
}
}
这是具有无限工作队列的 ThreadPoolExecutor
的预期行为。
引用 ThreadPoolExecutor
JavaDoc:
Core and maximum pool sizes
A ThreadPoolExecutor will automatically adjust the pool size [..]. When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. [...]
由于您将 corePoolSize
定义为 1
并且 PriorityBlockingQueue
本质上是一个无界队列(永远不会变满),因此您永远不会有一个以上的线程。
解决方法是将 corePoolSize
调整为所需的线程数。