线程池任务列表更新问题

Thread Pool task list update issue

我正在尝试在 java 中设计一个线程池。根据我的设计,我在主运行程序线程 class 中使用 java 的链表 DS 来保留所有提交的任务。此任务列表正在从主 class 更新,其中主 class 正在向任务列表添加任务。在我的主运行线程中,我 运行 一个 while 循环并不断检查 LinkedList 是否不为空,如果它包含一个任务,那么我正在检索任务并执行它。

这里的问题是我从我的 main 方法中添加了一个任务到任务列表中,我可以从 main 方法中看到这个任务列表的大小为 1,但是当我打印大小时在运行线程中任务列表对象,它显示为 0.

需要帮助弄清楚这里到底发生了什么。

public class ReusableThread<T> extends Thread{
    private volatile Queue<Work<T>> tasks = new LinkedList<Work<T>>();
    private Work<T> currentWork;

    private class Work<T>{
        Result<T> result;
        Taskable<T> task;

        public Work(Result<T> result, Taskable<T> task) {
            this.result = result;
            this.task = task;
        }
    }

    @Override
    public void run() {
        while(true){
            //System.out.println("ReusableThread.run()");
            System.out.println("Inside thread : " + getTasks().size()); //This print 0
            if(!tasks.isEmpty()){      
               currentWork = getWork();
               T value = currentWork.task.run();

               //currentWork.result.setValue(currentWork.task.run());
            }
            //currentWork.result.setComplete(true);
        }
    }

    public Work<T> getWork() {
        return tasks.remove();
    }

    public Queue<Work<T>> getTasks() {
        return tasks;
    }

    public Result<T> submit(Taskable<T> task) {
        Result<T> result = new Result<T>();
        this.tasks.add(new Work<T>(result, task));
        return result;
    }
}

主线程如下:

public void test() throws InterruptedException {
    int count = 0;  
    ReusableThread<Integer> rt = new ReusableThread();
    rt.start();

    Thread.sleep(1000);

        System.out.println("Thread-"+ count +" starting");
        Result<Integer> result = rt.submit(JavaUtils::task);
        System.out.println("In main : " + rt.getTasks().size()); //This prints 1
}

我认为存在线程安全问题。具体来说:

  1. a LinkedList 不是线程安全的,并且
  2. 您正在 rt.getTasks().size() 中使用 LinkedList 对象,但没有任何同步。

这足以导致 size() 在某些情况下成为 return 过时值。


如果您要依赖 volatile 的语义,您需要对每个写入/读取序列的 happens-before 关系进行适当的分析对线程安全很重要。这很棘手。

我的建议是:

  1. 不要使用 volatile。使用 synchronized and/or 现有的线程安全数据结构来代替...如果您 需要 重新发明轮子。

  2. 不要重新发明轮子。您可以用一次调用替换您的线程池 Executors.singleThreadExecutor;见 javadoc.

你缺少同步'tasks',所以两个线程(main和ReusableThread)随机访问tasks,所以你不会知道发生了什么,我修改了你的代码:

import java.util.LinkedList;
import java.util.Queue;

class Result<T> {
}

interface Taskable<T> {

    T run();

}

class JavaUtils {

    public static Integer task() {
        return 1;
    }

}

public class ReusableThread<T> extends Thread{
    private volatile Queue<Work<T>> tasks = new LinkedList<Work<T>>();
    private Work<T> currentWork;

    private class Work<T>{
        Result<T> result;
        Taskable<T> task;

        public Work(Result<T> result, Taskable<T> task) {
            this.result = result;
            this.task = task;
        }
    }

    @Override
    public void run() {
        while(true){
            //System.out.println("ReusableThread.run()");
            synchronized (tasks) {
                if (!tasks.isEmpty()) {
                    System.out.println("Inside thread : " + tasks.size()); //This print 0
                    currentWork = tasks.remove();
                    T value = currentWork.task.run();

                    //currentWork.result.setValue(currentWork.task.run());
                }
            }
            //currentWork.result.setComplete(true);
        }
    }

    public Work<T> getWork() {
        synchronized (tasks) {
            return tasks.remove();
        }
    }

    public Queue<Work<T>> getTasks() {
        synchronized (tasks) {
            return tasks;
        }
    }

    public int getTaskSize() {
        synchronized (tasks) {
            return tasks.size();
        }
    }

    public Result<T> submit(Taskable<T> task) {
        Result<T> result = new Result<T>();
        synchronized (tasks) {
            this.tasks.add(new Work<T>(result, task));
        }
        return result;
    }

    public static void test() throws InterruptedException {
        int count = 0;
        ReusableThread<Integer> rt = new ReusableThread();
        rt.start();

        Thread.sleep(10);

        System.out.println("Thread-"+ count +" starting");
        Result<Integer> result = rt.submit(JavaUtils::task);
        System.out.println("In main : " + rt.getTaskSize()); //This prints 1
    }

    public static void main(String[] args) throws Exception {
        test();
    }
}

并且您应该将 Thread.sleep() 添加到 while(true) 循环中,否则您将很快获得 100% cpu