java 中的数组在以下情况下是否是线程安全的:一个线程更改值,一个线程读取值?
Is array in java is threadsafe in case : one thread change value, one thread read value?
我写了一个简单的环形缓冲区,在方法 test1() 中我使用了一个线程 poll() 和
一个线程是 offer()。我测试了很多次,但它总是正确的。你能为我解释一下吗?
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@SuppressWarnings("unchecked")
public class RingBuffer<T> {
private T[] buffer;
// private volatile T[] buffer;
private int readIndex;
private int writeIndex;
private final int capacity;
private AtomicInteger size;
public RingBuffer(int k) {
this.buffer = (T[]) new Object[k];
this.capacity = k;
this.readIndex = 0;
this.writeIndex = 0;
this.size = new AtomicInteger(0);
}
public boolean offer(T value) {
if (isFull()) return false;
buffer[writeIndex] = value;
writeIndex++;
if (writeIndex == capacity) writeIndex -= capacity;
size.getAndIncrement();
return true;
}
public T poll() {
if (isEmpty()) return null;
int index = readIndex;
T x = buffer[index];
readIndex++;
if (readIndex == capacity) readIndex -= capacity;
size.getAndDecrement();
return x;
}
public boolean isEmpty() {
return size.get() == 0;
}
public boolean isFull() {
return size.get() == capacity;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
}
}
正如您在 test1() 方法中看到的那样,我使用了不同的线程,但 check
是正确的。
抱歉,因为 Whosebug 警告我这个问题主要是代码,所以我将在此处粘贴 test1() 方法。
public static void test1() throws ExecutionException, InterruptedException {
RingBuffer<String> buffer = new RingBuffer<>(1000);
AtomicBoolean writeDone1 = new AtomicBoolean(false);
ExecutorService service = Executors.newFixedThreadPool(2);
ExecutorService service1 = Executors.newFixedThreadPool(2);
Callable<List<String>> cw1 = () -> {
List<String > x = new ArrayList<>();
int count = 0;
for (int i = 0; i < 10000000; i++) {
if (buffer.offer( i+"")) {
count++;
x.add(i+"");
}
}
writeDone1.set(true);
System.out.println("num write " + count);
return x;
};
Callable<List<String>> cr = () -> {
List<String> x = new ArrayList<>();
int count = 0;
while (!writeDone1.get()) {
String data = buffer.poll();
if (data != null) {
x.add(data);
count++;
}
}
while (true) {
String data = buffer.poll();
if (data != null) {
x.add(data);
count++;
} else {
break;
}
}
System.out.println("num read " + count);
return x;
};
Future<List<String >> fw = service.submit(cw1);
Future<List<String>> fr = service1.submit(cr);
List<String> sw = fw.get();
List<String> sr = fr.get();
System.out.println(sw.size());
System.out.println(sr.size());
boolean check = true;
for (int i =0 ; i< sw.size() ; i++){
if (!sw.get(i).equals( sr.get(i))){
check = false;
break;
}
}
System.out.println(check);
service.shutdown();
service1.shutdown();
}
如果我只使用一个消费者和生产者。我不能在这里写测试使竞争条件。你能帮帮我吗?
谢谢
在某个位置写入数组和从同一位置读取数组之间的边沿之前没有发生。因此,如果您没有任何顺序保证,您的代码就会遭受数据竞争。
如果您还允许并发报价和并发轮询,那么您也会遇到竞争条件。
我玩环形缓冲区已经有一段时间了。但通常你会使用尾部和头部序列(例如长)。如果你使环形缓冲区成为 2 的幂,你可以在序列到索引的转换上做一个便宜的 mod。头部和尾部序列可能是相对昂贵的 volatiles(我真的会从它开始),稍后你可以使用宽松的内存顺序 modes。头部和尾部会给你适当的边缘之前发生的事情,所以不需要对数组做任何特殊的事情。使用这种方法,您还可以摆脱 'size';您可以将大小计算为尾部和头部之间的差异;大小的问题是它会导致线程 read/writing 与环形缓冲区之间的争用。您还需要正确填充 head/tail 字段以防止错误共享。
如果有一个消费者和一个生产者,那么这个RingBuffer
是线程安全的。
Happens-before 由 AtomicInteger size
提供:在 poll()
和 offer()
.offer()
.
的开头读取并写入结尾
例如,让我们看一下poll()
。
请注意:
- 在
poll()
中,只有当我们阅读了 size.get()!=0
时,我们才会阅读 buffer[index]
size.get()!=0
只能发生在 size.getAndIncrement()
之后 offer()
size
是 AtomicInteger
,这意味着它提供 happens-before 并使 offer()
中的所有修改在 [=12= 中可见]
换句话说:
buffer[writeIndex]=value
在 offer()
- -(发生在之前)->
size.getAndIncrement()
在 offer()
- -(发生在之前)->
size.get()!=0
在 poll()
- -(发生在之前)->
T x = buffer[index]
在 poll()
我写了一个简单的环形缓冲区,在方法 test1() 中我使用了一个线程 poll() 和 一个线程是 offer()。我测试了很多次,但它总是正确的。你能为我解释一下吗?
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@SuppressWarnings("unchecked")
public class RingBuffer<T> {
private T[] buffer;
// private volatile T[] buffer;
private int readIndex;
private int writeIndex;
private final int capacity;
private AtomicInteger size;
public RingBuffer(int k) {
this.buffer = (T[]) new Object[k];
this.capacity = k;
this.readIndex = 0;
this.writeIndex = 0;
this.size = new AtomicInteger(0);
}
public boolean offer(T value) {
if (isFull()) return false;
buffer[writeIndex] = value;
writeIndex++;
if (writeIndex == capacity) writeIndex -= capacity;
size.getAndIncrement();
return true;
}
public T poll() {
if (isEmpty()) return null;
int index = readIndex;
T x = buffer[index];
readIndex++;
if (readIndex == capacity) readIndex -= capacity;
size.getAndDecrement();
return x;
}
public boolean isEmpty() {
return size.get() == 0;
}
public boolean isFull() {
return size.get() == capacity;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
}
}
正如您在 test1() 方法中看到的那样,我使用了不同的线程,但 check
是正确的。
抱歉,因为 Whosebug 警告我这个问题主要是代码,所以我将在此处粘贴 test1() 方法。
public static void test1() throws ExecutionException, InterruptedException {
RingBuffer<String> buffer = new RingBuffer<>(1000);
AtomicBoolean writeDone1 = new AtomicBoolean(false);
ExecutorService service = Executors.newFixedThreadPool(2);
ExecutorService service1 = Executors.newFixedThreadPool(2);
Callable<List<String>> cw1 = () -> {
List<String > x = new ArrayList<>();
int count = 0;
for (int i = 0; i < 10000000; i++) {
if (buffer.offer( i+"")) {
count++;
x.add(i+"");
}
}
writeDone1.set(true);
System.out.println("num write " + count);
return x;
};
Callable<List<String>> cr = () -> {
List<String> x = new ArrayList<>();
int count = 0;
while (!writeDone1.get()) {
String data = buffer.poll();
if (data != null) {
x.add(data);
count++;
}
}
while (true) {
String data = buffer.poll();
if (data != null) {
x.add(data);
count++;
} else {
break;
}
}
System.out.println("num read " + count);
return x;
};
Future<List<String >> fw = service.submit(cw1);
Future<List<String>> fr = service1.submit(cr);
List<String> sw = fw.get();
List<String> sr = fr.get();
System.out.println(sw.size());
System.out.println(sr.size());
boolean check = true;
for (int i =0 ; i< sw.size() ; i++){
if (!sw.get(i).equals( sr.get(i))){
check = false;
break;
}
}
System.out.println(check);
service.shutdown();
service1.shutdown();
}
如果我只使用一个消费者和生产者。我不能在这里写测试使竞争条件。你能帮帮我吗?
谢谢
在某个位置写入数组和从同一位置读取数组之间的边沿之前没有发生。因此,如果您没有任何顺序保证,您的代码就会遭受数据竞争。
如果您还允许并发报价和并发轮询,那么您也会遇到竞争条件。
我玩环形缓冲区已经有一段时间了。但通常你会使用尾部和头部序列(例如长)。如果你使环形缓冲区成为 2 的幂,你可以在序列到索引的转换上做一个便宜的 mod。头部和尾部序列可能是相对昂贵的 volatiles(我真的会从它开始),稍后你可以使用宽松的内存顺序 modes。头部和尾部会给你适当的边缘之前发生的事情,所以不需要对数组做任何特殊的事情。使用这种方法,您还可以摆脱 'size';您可以将大小计算为尾部和头部之间的差异;大小的问题是它会导致线程 read/writing 与环形缓冲区之间的争用。您还需要正确填充 head/tail 字段以防止错误共享。
如果有一个消费者和一个生产者,那么这个RingBuffer
是线程安全的。
Happens-before 由 AtomicInteger size
提供:在 poll()
和 offer()
.offer()
.
例如,让我们看一下poll()
。
请注意:
- 在
poll()
中,只有当我们阅读了size.get()!=0
时,我们才会阅读 size.get()!=0
只能发生在size.getAndIncrement()
之后offer()
size
是AtomicInteger
,这意味着它提供 happens-before 并使offer()
中的所有修改在 [=12= 中可见]
buffer[index]
换句话说:
buffer[writeIndex]=value
在offer()
- -(发生在之前)->
size.getAndIncrement()
在offer()
- -(发生在之前)->
size.get()!=0
在poll()
- -(发生在之前)->
T x = buffer[index]
在poll()