JAVA 线程池重用线程
JAVA Thread pool reusing threads
各位。
我对使用线程池有误解。实际结果与此 class 的 API 描述不同。当我在线程池中使用 LinkedBlockedQueue
时,它不会重用线程,线程池等待在构造函数中设置的 KeepAliveTime,然后终止该线程并创建一个新线程。当我将 KeepAliveTime 设置得较小时,例如 1 秒或更短,它会删除线程并重新创建它,但是如果我设置一分钟,则不会创建新线程,因为 MaxPoolSize
不允许它并且队列已经满所以所有任务都被拒绝,但是这次 keepAliveTime 设置分钟的线程什么都不做。我很新,不明白为什么它不重用这些线程。 keepTimeAlive
到期后,它会终止这些线程,如果队列已满,则会创建一个新线程。为什么会这样?据我从 API 了解到,如果线程在 keepAliveTime 期间处于空闲状态,它必须重用它。当我使用 SynchronousQueue
但不是 LinkedBlockingQueue
.
时它会重用线程
public class Main {
private volatile int remainingTasksCount;
private volatile static ThreadPoolExecutor consumer = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3));
private static Runnable task = () -> {
System.out.println(String.format("consumer %s, id %s, size %s, active count %s, queue %s",
Thread.currentThread().getName(), Thread.currentThread().getId(),
consumer.getPoolSize(), consumer.getActiveCount(), 3-consumer.getQueue().remainingCapacity()));
String s = new String();
synchronized (s) {
try {
s.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
public static void main(String[] args) throws IOException {
try {
new Thread(() -> {
while (true) {
try {
for (int i = 0; i < 5; i++) {
consumer.submit(task);
}
System.out.println("PUSH TASKS");
synchronized (Thread.currentThread()) {
Thread.currentThread().wait(10000);
}
} catch (Throwable th) {
System.out.println(th);
}
}
}).start();
} catch (Throwable th) {
System.out.println(th);
}
}
输出
PUSH TASKS
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 0
Disconnected from the target VM, address: '127.0.0.1:64434', transport: 'socket'
Process finished with exit code 1
但是下次生产者提交任务时,我得到RejectedExecutionException
如果我将 keepAliveTime
更改为 1 Second
。一切都运作良好,但创造
新话题。
PUSH TASKS
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
consumer pool-1-thread-3, id 17, size 1, active count 1, queue 2
PUSH TASKS
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 2
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
如果有人能解释我的错误,或者我错过的一些基本原则,我会很高兴
这是一个竞争条件。如果您遵循 submit()
足够长的时间(在源代码中),您将到达 ThreadPoolExecutor.execucte()
:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* long comment block removed */
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
当您的 submit
-loop 运行 是第一次时,execute
将创建新的工作人员,并向他们提供您的任务,而不是尝试将他们推入队列(addWorker
+return
), 所以 2 个任务立即开始,3 个进入可以容纳所有 3 个任务的队列。
第二次,submit
-s 将以 workQueue.offer
结束,这可能会使队列饱和(取决于工作人员尝试使用新项目的速度),当它这样做时,最后的努力 addWorker
将 运行 失败,导致 reject
,因为不允许创建新的工人。
实际上,如果您开始在提交循环中执行 'things',它最终会开始工作。例如,我尝试 println(i)
,但速度很慢,无法完成一些任务并导致循环成功。当我尝试 print(i)
时已经太快了,它在第 4 次提交时就死了,所以没有任务很快被消耗掉。所以这是一个微妙的问题,通常是什么竞争条件。
我认为由于您的代码示例,您对线程池的工作原理有一些误解。我尝试 运行 它并从 5 个任务和无限数量的 RejectedExecutionException 中获取输出。发生这种情况是因为在异常情况下 Thread.currentThread().wait(10000);
没有被调用并且 5 个任务被添加到池中并且这个逻辑一次又一次地重复产生新的异常。尝试包围consumer.submit(task);使用 try-catch 块,您会看到只有两个线程按预期处理所有任务,因为 keepTimeAlive
比等待时间长。在第二个示例中,keepTimeAlive
比等待时间短,因此在每次等待后都会创建新的非核心线程,并且在每次循环调用后您会看到不同的 ID。这是正确的,因为先前的非核心线程已停止,因为它空闲的时间超过 keepTimeAlive
.
各位。
我对使用线程池有误解。实际结果与此 class 的 API 描述不同。当我在线程池中使用 LinkedBlockedQueue
时,它不会重用线程,线程池等待在构造函数中设置的 KeepAliveTime,然后终止该线程并创建一个新线程。当我将 KeepAliveTime 设置得较小时,例如 1 秒或更短,它会删除线程并重新创建它,但是如果我设置一分钟,则不会创建新线程,因为 MaxPoolSize
不允许它并且队列已经满所以所有任务都被拒绝,但是这次 keepAliveTime 设置分钟的线程什么都不做。我很新,不明白为什么它不重用这些线程。 keepTimeAlive
到期后,它会终止这些线程,如果队列已满,则会创建一个新线程。为什么会这样?据我从 API 了解到,如果线程在 keepAliveTime 期间处于空闲状态,它必须重用它。当我使用 SynchronousQueue
但不是 LinkedBlockingQueue
.
public class Main {
private volatile int remainingTasksCount;
private volatile static ThreadPoolExecutor consumer = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3));
private static Runnable task = () -> {
System.out.println(String.format("consumer %s, id %s, size %s, active count %s, queue %s",
Thread.currentThread().getName(), Thread.currentThread().getId(),
consumer.getPoolSize(), consumer.getActiveCount(), 3-consumer.getQueue().remainingCapacity()));
String s = new String();
synchronized (s) {
try {
s.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
public static void main(String[] args) throws IOException {
try {
new Thread(() -> {
while (true) {
try {
for (int i = 0; i < 5; i++) {
consumer.submit(task);
}
System.out.println("PUSH TASKS");
synchronized (Thread.currentThread()) {
Thread.currentThread().wait(10000);
}
} catch (Throwable th) {
System.out.println(th);
}
}
}).start();
} catch (Throwable th) {
System.out.println(th);
}
}
输出
PUSH TASKS
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 0
Disconnected from the target VM, address: '127.0.0.1:64434', transport: 'socket'
Process finished with exit code 1
但是下次生产者提交任务时,我得到RejectedExecutionException
如果我将 keepAliveTime
更改为 1 Second
。一切都运作良好,但创造
新话题。
PUSH TASKS
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
consumer pool-1-thread-3, id 17, size 1, active count 1, queue 2
PUSH TASKS
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 2
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
如果有人能解释我的错误,或者我错过的一些基本原则,我会很高兴
这是一个竞争条件。如果您遵循 submit()
足够长的时间(在源代码中),您将到达 ThreadPoolExecutor.execucte()
:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* long comment block removed */
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
当您的 submit
-loop 运行 是第一次时,execute
将创建新的工作人员,并向他们提供您的任务,而不是尝试将他们推入队列(addWorker
+return
), 所以 2 个任务立即开始,3 个进入可以容纳所有 3 个任务的队列。
第二次,submit
-s 将以 workQueue.offer
结束,这可能会使队列饱和(取决于工作人员尝试使用新项目的速度),当它这样做时,最后的努力 addWorker
将 运行 失败,导致 reject
,因为不允许创建新的工人。
实际上,如果您开始在提交循环中执行 'things',它最终会开始工作。例如,我尝试 println(i)
,但速度很慢,无法完成一些任务并导致循环成功。当我尝试 print(i)
时已经太快了,它在第 4 次提交时就死了,所以没有任务很快被消耗掉。所以这是一个微妙的问题,通常是什么竞争条件。
我认为由于您的代码示例,您对线程池的工作原理有一些误解。我尝试 运行 它并从 5 个任务和无限数量的 RejectedExecutionException 中获取输出。发生这种情况是因为在异常情况下 Thread.currentThread().wait(10000);
没有被调用并且 5 个任务被添加到池中并且这个逻辑一次又一次地重复产生新的异常。尝试包围consumer.submit(task);使用 try-catch 块,您会看到只有两个线程按预期处理所有任务,因为 keepTimeAlive
比等待时间长。在第二个示例中,keepTimeAlive
比等待时间短,因此在每次等待后都会创建新的非核心线程,并且在每次循环调用后您会看到不同的 ID。这是正确的,因为先前的非核心线程已停止,因为它空闲的时间超过 keepTimeAlive
.