同步列表 - 添加多个线程并删除一个线程
Synchronized List - add multiple threads and remove one thread
我有这个代码:
private static boolean flagStop = false;
//synchronized list
private static List<Object> queue = Collections.synchronizedList(new ArrayList<>());
//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
while (!flagStop) {
if (!queue.isEmpty()) {
int size = queue.size();
List<Object> copyList = queue.subList(0, size);
for (int i = 0; i < size; i++) {
queue.remove(0);
}
copyList.forEach(System.out::println);
}
try {
Thread.sleep(1_000);
}
catch (Exception ignored) { }
}
});
static {
printQueueThread.start();
}
//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj) {
queue.add(obj);
}
在这个 class 中,我有一个同步列表,我用它来添加来自不同线程的元素,并在另一个线程中打印和删除每个元素。
我的观点是不要在 addElement
停止线程并安全地处理(打印和删除)元素。
我在这里发帖询问我的代码是否线程安全或如何检查它。
在我看来 List
不是您要找的 class。 Java 实际上有一个 Queue
class (以及它的线程安全实现)。
然后,进一步查看您的代码,我发现它非常浪费;
您正在创建列表的副本,从原始对象中删除对象,然后打印副本。
这似乎是一个 CopyOnWriteArrayList 但对我来说线程安全性较低。这可能很有用,但队列解决方案最适合这种情况并且性能更高。
这是您的代码在更改为使用 BlockingQueue
:
后的样子
private static boolean flagStop = false;
//synchronized list
private static Queue<Object> queue = new BlockingQueue<Object>();
//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
Object currentObject;
while(!flagStop) {
while((object = queue.poll()) != null) {
System.out.println(currentObject);
}
}
try {
Thread.sleep(1_000);
} catch (Exception ignored) {
}
}
});
static{
printQueueThread.start();
}
//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj){
queue.offer(obj);
}
另一方面,您还使用布尔值来告诉您的线程是否应该停止。这可能会导致同步问题。你真的应该用 AtomicBoolean 代替它 - 布尔值的线程安全实现。
将 boolean flagStop = false
更改为 AtomicBoolean flagStop = new AtomicBoolean()
,将 while (!flagStop)
更改为 while(!flagStop.get())
。
我不太确定为什么你的 class 也是静态的,但这通常是不好的做法。我会避免这种情况并使用实例。
我还建议实现 Runnable,使代码更清晰,并在 addElement 中添加空检查。
通过所有更正,您的最终 class 将如下所示:
public class MyPrintQueue implements Runnable {
private AtomicBoolean shouldStop;
//synchronized list
private Queue<Object> queue;
//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private Thread printQueueThread;
public MyPrintQueue() {
this.queue = new BlockingQueue<>();
this.printQueueThread = new Thread(this);
}
@Override
public void run() {
Object currentObject;
while(!flagStop.get()) {
while((object = queue.poll()) != null) {
System.out.println(currentObject);
}
}
try {
Thread.sleep(1_000);
} catch (Exception ignored) {
// Ignored
}
}
}
public void startThread() {
printQueueThread.start();
}
public void stop() {
flagStop.set(true);
}
//add elements in queue
//this is accessed by multiple threads, by few times per second.
public void addElement(Object obj){
if (obj == null)
return;
queue.offer(obj);
}
}
也就是说,您的代码对我来说似乎不是完全线程安全的。我可能是错的,但您可以删除 Thread.sleep 调用并创建一个 TestClass,它在 while true
中调用 addElement() 来检查是否曾抛出任何异常。
如果您不想使用队列并且必须使用列表,则应将 Collections.synchronizedList 替换为 CopyOnWriteArrayList<>(),有效地将您的代码更改为此
private static boolean flagStop = false;
//synchronized list
private static List<Object> queue = new CopyOnWriteArrayList<>();
//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
while(!flagStop){
if(!queue.isEmpty()){
while (queue.size() > 0) {
System.out.println(queue.get(0));
queue.remove(0);
}
}
try{ Thread.sleep(1_000); }catch(Exception ignored){}
}
});
static{
printQueueThread.start();
}
//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj){
queue.add(obj);
}
请注意,上面的代码仍然是静态的 (!),并且没有实现 AtomicBoolean。
另请查看以下似乎与您的问题相关的问题及其答案:
我有这个代码:
private static boolean flagStop = false;
//synchronized list
private static List<Object> queue = Collections.synchronizedList(new ArrayList<>());
//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
while (!flagStop) {
if (!queue.isEmpty()) {
int size = queue.size();
List<Object> copyList = queue.subList(0, size);
for (int i = 0; i < size; i++) {
queue.remove(0);
}
copyList.forEach(System.out::println);
}
try {
Thread.sleep(1_000);
}
catch (Exception ignored) { }
}
});
static {
printQueueThread.start();
}
//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj) {
queue.add(obj);
}
在这个 class 中,我有一个同步列表,我用它来添加来自不同线程的元素,并在另一个线程中打印和删除每个元素。
我的观点是不要在 addElement
停止线程并安全地处理(打印和删除)元素。
我在这里发帖询问我的代码是否线程安全或如何检查它。
在我看来 List
不是您要找的 class。 Java 实际上有一个 Queue
class (以及它的线程安全实现)。
然后,进一步查看您的代码,我发现它非常浪费; 您正在创建列表的副本,从原始对象中删除对象,然后打印副本。 这似乎是一个 CopyOnWriteArrayList 但对我来说线程安全性较低。这可能很有用,但队列解决方案最适合这种情况并且性能更高。
这是您的代码在更改为使用 BlockingQueue
:
private static boolean flagStop = false;
//synchronized list
private static Queue<Object> queue = new BlockingQueue<Object>();
//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
Object currentObject;
while(!flagStop) {
while((object = queue.poll()) != null) {
System.out.println(currentObject);
}
}
try {
Thread.sleep(1_000);
} catch (Exception ignored) {
}
}
});
static{
printQueueThread.start();
}
//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj){
queue.offer(obj);
}
另一方面,您还使用布尔值来告诉您的线程是否应该停止。这可能会导致同步问题。你真的应该用 AtomicBoolean 代替它 - 布尔值的线程安全实现。
将 boolean flagStop = false
更改为 AtomicBoolean flagStop = new AtomicBoolean()
,将 while (!flagStop)
更改为 while(!flagStop.get())
。
我不太确定为什么你的 class 也是静态的,但这通常是不好的做法。我会避免这种情况并使用实例。 我还建议实现 Runnable,使代码更清晰,并在 addElement 中添加空检查。 通过所有更正,您的最终 class 将如下所示:
public class MyPrintQueue implements Runnable {
private AtomicBoolean shouldStop;
//synchronized list
private Queue<Object> queue;
//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private Thread printQueueThread;
public MyPrintQueue() {
this.queue = new BlockingQueue<>();
this.printQueueThread = new Thread(this);
}
@Override
public void run() {
Object currentObject;
while(!flagStop.get()) {
while((object = queue.poll()) != null) {
System.out.println(currentObject);
}
}
try {
Thread.sleep(1_000);
} catch (Exception ignored) {
// Ignored
}
}
}
public void startThread() {
printQueueThread.start();
}
public void stop() {
flagStop.set(true);
}
//add elements in queue
//this is accessed by multiple threads, by few times per second.
public void addElement(Object obj){
if (obj == null)
return;
queue.offer(obj);
}
}
也就是说,您的代码对我来说似乎不是完全线程安全的。我可能是错的,但您可以删除 Thread.sleep 调用并创建一个 TestClass,它在 while true
中调用 addElement() 来检查是否曾抛出任何异常。
如果您不想使用队列并且必须使用列表,则应将 Collections.synchronizedList 替换为 CopyOnWriteArrayList<>(),有效地将您的代码更改为此
private static boolean flagStop = false;
//synchronized list
private static List<Object> queue = new CopyOnWriteArrayList<>();
//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
while(!flagStop){
if(!queue.isEmpty()){
while (queue.size() > 0) {
System.out.println(queue.get(0));
queue.remove(0);
}
}
try{ Thread.sleep(1_000); }catch(Exception ignored){}
}
});
static{
printQueueThread.start();
}
//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj){
queue.add(obj);
}
请注意,上面的代码仍然是静态的 (!),并且没有实现 AtomicBoolean。
另请查看以下似乎与您的问题相关的问题及其答案: