JAVA 线程池重用线程

JAVA Thread pool reusing threads

各位。 我对使用线程池有误解。实际结果与此 class 的 API 描述不同。当我在线程池中使用 LinkedBlockedQueue 时,它不会重用线程,线程池等待在构造函数中设置的 KeepAliveTime,然后终止该线程并创建一个新线程。当我将 KeepAliveTime 设置得较小时,例如 1 秒或更短,它会删除线程并重新创建它,但是如果我设置一分钟,则不会创建新线程,因为 MaxPoolSize 不允许它并且队列已经满所以所有任务都被拒绝,但是这次 keepAliveTime 设置分钟的线程什么都不做。我很新,不明白为什么它不重用这些线程。 keepTimeAlive 到期后,它会终止这些线程,如果队列已满,则会创建一个新线程。为什么会这样?据我从 API 了解到,如果线程在 keepAliveTime 期间处于空闲状态,它必须重用它。当我使用 SynchronousQueue 但不是 LinkedBlockingQueue.

时它会重用线程
public class Main {

    private volatile int remainingTasksCount;
    private volatile static ThreadPoolExecutor consumer = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3));

    private static Runnable task = () -> {
        System.out.println(String.format("consumer %s, id %s, size %s, active count %s, queue %s",
                Thread.currentThread().getName(), Thread.currentThread().getId(),
                consumer.getPoolSize(), consumer.getActiveCount(), 3-consumer.getQueue().remainingCapacity()));
        String s = new String();
        synchronized (s) {
            try {
                s.wait(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };


    public static void main(String[] args) throws IOException {
        try {
            new Thread(() -> {
                while (true) {
                    try {
                        for (int i = 0; i < 5; i++) {
                            consumer.submit(task);
                        }
                        System.out.println("PUSH TASKS");
                        synchronized (Thread.currentThread()) {
                            Thread.currentThread().wait(10000);
                        }
                    } catch (Throwable th) {
                        System.out.println(th);
                    }
                }
            }).start();
        } catch (Throwable th) {
            System.out.println(th);
        }
    }  

输出

PUSH TASKS
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 1, queue 0
Disconnected from the target VM, address: '127.0.0.1:64434', transport: 'socket'

Process finished with exit code 1

但是下次生产者提交任务时,我得到RejectedExecutionException

如果我将 keepAliveTime 更改为 1 Second。一切都运作良好,但创造 新话题。

PUSH TASKS
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 3
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-1, id 15, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 3
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 2
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-2, id 16, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
consumer pool-1-thread-3, id 17, size 1, active count 1, queue 2
PUSH TASKS
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-4, id 18, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0
PUSH TASKS
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 2
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 3
consumer pool-1-thread-3, id 17, size 2, active count 2, queue 1
consumer pool-1-thread-5, id 19, size 2, active count 2, queue 1
consumer pool-1-thread-3, id 17, size 2, active count 1, queue 0

如果有人能解释我的错误,或者我错过的一些基本原则,我会很高兴

这是一个竞争条件。如果您遵循 submit() 足够长的时间(在源代码中),您将到达 ThreadPoolExecutor.execucte():

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /* long comment block removed */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

当您的 submit-loop 运行 是第一次时,execute 将创建新的工作人员,并向他们提供您的任务,而不是尝试将他们推入队列(addWorker+return), 所以 2 个任务立即开始,3 个进入可以容纳所有 3 个任务的队列。

第二次,submit-s 将以 workQueue.offer 结束,这可能会使队列饱和(取决于工作人员尝试使用新项目的速度),当它这样做时,最后的努力 addWorker 将 运行 失败,导致 reject,因为不允许创建新的工人。

实际上,如果您开始在提交循环中执行 'things',它最终会开始工作。例如,我尝试 println(i),但速度很慢,无法完成一些任务并导致循环成功。当我尝试 print(i) 时已经太快了,它在第 4 次提交时就死了,所以没有任务很快被消耗掉。所以这是一个微妙的问题,通常是什么竞争条件。

我认为由于您的代码示例,您对线程池的工作原理有一些误解。我尝试 运行 它并从 5 个任务和无限数量的 RejectedExecutionException 中获取输出。发生这种情况是因为在异常情况下 Thread.currentThread().wait(10000); 没有被调用并且 5 个任务被添加到池中并且这个逻辑一次又一次地重复产生新的异常。尝试包围consumer.submit(task);使用 try-catch 块,您会看到只有两个线程按预期处理所有任务,因为 keepTimeAlive 比等待时间长。在第二个示例中,keepTimeAlive 比等待时间短,因此在每次等待后都会创建新的非核心线程,并且在每次循环调用后您会看到不同的 ID。这是正确的,因为先前的非核心线程已停止,因为它空闲的时间超过 keepTimeAlive.