同步列表 - 添加多个线程并删除一个线程

Synchronized List - add multiple threads and remove one thread

我有这个代码:

private static boolean flagStop = false;

//synchronized list
private static List<Object> queue = Collections.synchronizedList(new ArrayList<>());

//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
    while (!flagStop) {
        if (!queue.isEmpty()) {
            int size = queue.size();
            List<Object> copyList = queue.subList(0, size);
            for (int i = 0; i < size; i++) {
                queue.remove(0);
            }
            copyList.forEach(System.out::println);
        }
        try {
            Thread.sleep(1_000);
        }
        catch (Exception ignored) { }
    }
});

static {
    printQueueThread.start();
}

//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj) {
    queue.add(obj);
}

在这个 class 中,我有一个同步列表,我用它来添加来自不同线程的元素,并在另一个线程中打印和删除每个元素。

我的观点是不要在 addElement 停止线程并安全地处理(打印和删除)元素。

我在这里发帖询问我的代码是否线程安全或如何检查它。

在我看来 List 不是您要找的 class。 Java 实际上有一个 Queue class (以及它的线程安全实现)。

然后,进一步查看您的代码,我发现它非常浪费; 您正在创建列表的副本,从原始对象中删除对象,然后打印副本。 这似乎是一个 CopyOnWriteArrayList 但对我来说线程安全性较低。这可能很有用,但队列解决方案最适合这种情况并且性能更高。

这是您的代码在更改为使用 BlockingQueue:

后的样子
private static boolean flagStop = false;

//synchronized list
private static Queue<Object> queue = new BlockingQueue<Object>();

//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
    Object currentObject;
    while(!flagStop) {
            while((object = queue.poll()) != null) {
                System.out.println(currentObject);
            }
        }
        try { 
            Thread.sleep(1_000); 
        } catch (Exception ignored) {
        }
    }
});

static{
    printQueueThread.start();
}

//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj){
    queue.offer(obj);
}

另一方面,您还使用布尔值来告诉您的线程是否应该停止。这可能会导致同步问题。你真的应该用 AtomicBoolean 代替它 - 布尔值的线程安全实现。 将 boolean flagStop = false 更改为 AtomicBoolean flagStop = new AtomicBoolean(),将 while (!flagStop) 更改为 while(!flagStop.get())

我不太确定为什么你的 class 也是静态的,但这通常是不好的做法。我会避免这种情况并使用实例。 我还建议实现 Runnable,使代码更清晰,并在 addElement 中添加空检查。 通过所有更正,您的最终 class 将如下所示:

public class MyPrintQueue implements Runnable {
    private AtomicBoolean shouldStop;

    //synchronized list
    private Queue<Object> queue;
    
    //thread which check once per second the size of queue
    //and print elements from list and try to remove them safely
    private Thread printQueueThread;
    
    public MyPrintQueue() {
        this.queue = new BlockingQueue<>();
        this.printQueueThread = new Thread(this);
    }

    @Override
    public void run() {
        Object currentObject;
        while(!flagStop.get()) {
                while((object = queue.poll()) != null) {
                    System.out.println(currentObject);
                }
            }
            try { 
                Thread.sleep(1_000); 
            } catch (Exception ignored) {
                // Ignored
            }
        }
    }

    public void startThread() {
        printQueueThread.start();
    }

    public void stop() {
        flagStop.set(true);
    }

    //add elements in queue
    //this is accessed by multiple threads, by few times per second.
    public void addElement(Object obj){
        if (obj == null)
            return;
        queue.offer(obj);
    }
}

也就是说,您的代码对我来说似乎不是完全线程安全的。我可能是错的,但您可以删除 Thread.sleep 调用并创建一个 TestClass,它在 while true 中调用 addElement() 来检查是否曾抛出任何异常。 如果您不想使用队列并且必须使用列表,则应将 Collections.synchronizedList 替换为 CopyOnWriteArrayList<>(),有效地将您的代码更改为此

private static boolean flagStop = false;

//synchronized list
private static List<Object> queue = new CopyOnWriteArrayList<>();

//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {

    while(!flagStop){

        if(!queue.isEmpty()){
            while (queue.size() > 0) {
                System.out.println(queue.get(0));
                queue.remove(0);
            }
        }

        try{ Thread.sleep(1_000); }catch(Exception ignored){}

    }

});

static{
    printQueueThread.start();
}

//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj){
    queue.add(obj);
}

请注意,上面的代码仍然是静态的 (!),并且没有实现 AtomicBoolean。 另请查看以下似乎与您的问题相关的问题及其答案: