关于 Java 并发程序的一个奇怪错误(使用信号量)
A strange bug about a Java concurrency program (using semaphore)
我运行遇到的问题就是每次producer填满arraylist buffer的时候程序都会停止执行。但是,理论上。消费者进程仍然可以进入 get() 函数,导致 notEmpty 信号量现在获得的信号值高达 10。所以生产者进程在消费者进程将信号值释放回信号量notFull后仍然可以工作。
但是就是停了,我也查不出问题所在
程序如下:
import java.util.List;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Semaphore;
/**
* A bounded buffer maintains a fixed number of "slots". Items can be
* inserted into and removed from the buffer. The buffer has a maximum
* size.
*/
class BoundedBuffer2
{
// the maximum size of the bounded buffer
final public static int MAXSIZE = 10;
Semaphore notEmpty = new Semaphore(0);
Semaphore notFull = new Semaphore(MAXSIZE);
Semaphore sema = new Semaphore(1);
// the buffer
volatile List<Integer> buffer;
public BoundedBuffer2()
{
buffer = new ArrayList<Integer>();
}
// add an element to the end of the buffer if it is not full
public synchronized void put(int input)
throws InterruptedException
{
notFull.acquire();
sema.acquire();
buffer.add(input);
sema.release();
notEmpty.release();
}
// take an element from the front of the buffer
public synchronized int get()
throws InterruptedException
{
notEmpty.acquire();
sema.acquire();
int result = buffer.remove(0);
sema.release();
notFull.release();
return result;
}
public int size()
{
int result = buffer.size();
return result;
}
}
/**
* An instance of the Producer class produces new integers at random
* intervals, and inserts them into a bounded buffer.
*/
class Producer2 extends Thread
{
// the buffer in which to insert new integers
BoundedBuffer2 buffer;
public Producer2(BoundedBuffer2 buffer)
{
this.buffer = buffer;
}
public void run()
{
Random random = new Random();
try {
while (true) {
Thread.sleep(100);
//insert a random integer
int next = random.nextInt();
buffer.put(next);
System.err.println("b.size() increases to " + buffer.size());
}
}
catch (InterruptedException e) {}
}
}
/**
* An instance of the Consumer class consumes integers from a bounded
* buffer at random intervals.
*/
class Consumer2 extends Thread
{
// the buffer in which to insert new integers
BoundedBuffer2 buffer;
public Consumer2(BoundedBuffer2 buffer)
{
this.buffer = buffer;
}
public void run()
{
Random random = new Random();
try {
while (true) {
Thread.sleep(200);
//get the next integer from the buffer
int next = buffer.get();
System.err.println("next = " + next);
System.err.println("b.size() reducted to " + buffer.size());
}
}
catch (InterruptedException e) {}
}
}
public class UseBuffer2
{
public static void main(String [] args)
{
BoundedBuffer2 buffer = new BoundedBuffer2();
Producer2 p = new Producer2(buffer);
Consumer2 c = new Consumer2(buffer);
p.start();
c.start();
}
}
这是控制台的输出:
b.size() increases to 1
b.size() increases to 2
next = 400524264
b.size() reducted to 1
b.size() increases to 2
b.size() increases to 3
next = 241523118
b.size() reducted to 2
b.size() increases to 3
next = -1618289090
b.size() reducted to 2
b.size() increases to 3
b.size() increases to 4
next = -316455080
b.size() reducted to 3
b.size() increases to 4
b.size() increases to 5
next = 338682909
b.size() reducted to 4
b.size() increases to 5
b.size() increases to 6
next = -961276708
b.size() reducted to 5
b.size() increases to 6
b.size() increases to 7
next = 2056804692
b.size() reducted to 6
b.size() increases to 7
b.size() increases to 8
next = -301063524
b.size() reducted to 7
b.size() increases to 8
b.size() increases to 9
next = -148582342
b.size() reducted to 8
b.size() increases to 9
b.size() increases to 10
next = -2076430410
b.size() reducted to 9
b.size() increases to 10
您的 put()
和 get()
方法已同步。因此,如果生产者进入 put()
方法,试图获取 notFull
信号量,但不能,因为缓冲区已满,它会永远阻塞,保持进入同步方法时获取的锁。所以消费者无法进入get()方法,也无法从缓冲区中取出元素。
了解 synchronized 关键字的工作原理:https://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
相关部分:
it is not possible for two invocations of synchronized methods on the
same object to interleave. When one thread is executing a synchronized
method for an object, all other threads that invoke synchronized
methods for the same object block (suspend execution) until the first
thread is done with the object.
我运行遇到的问题就是每次producer填满arraylist buffer的时候程序都会停止执行。但是,理论上。消费者进程仍然可以进入 get() 函数,导致 notEmpty 信号量现在获得的信号值高达 10。所以生产者进程在消费者进程将信号值释放回信号量notFull后仍然可以工作。
但是就是停了,我也查不出问题所在
程序如下:
import java.util.List;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Semaphore;
/**
* A bounded buffer maintains a fixed number of "slots". Items can be
* inserted into and removed from the buffer. The buffer has a maximum
* size.
*/
class BoundedBuffer2
{
// the maximum size of the bounded buffer
final public static int MAXSIZE = 10;
Semaphore notEmpty = new Semaphore(0);
Semaphore notFull = new Semaphore(MAXSIZE);
Semaphore sema = new Semaphore(1);
// the buffer
volatile List<Integer> buffer;
public BoundedBuffer2()
{
buffer = new ArrayList<Integer>();
}
// add an element to the end of the buffer if it is not full
public synchronized void put(int input)
throws InterruptedException
{
notFull.acquire();
sema.acquire();
buffer.add(input);
sema.release();
notEmpty.release();
}
// take an element from the front of the buffer
public synchronized int get()
throws InterruptedException
{
notEmpty.acquire();
sema.acquire();
int result = buffer.remove(0);
sema.release();
notFull.release();
return result;
}
public int size()
{
int result = buffer.size();
return result;
}
}
/**
* An instance of the Producer class produces new integers at random
* intervals, and inserts them into a bounded buffer.
*/
class Producer2 extends Thread
{
// the buffer in which to insert new integers
BoundedBuffer2 buffer;
public Producer2(BoundedBuffer2 buffer)
{
this.buffer = buffer;
}
public void run()
{
Random random = new Random();
try {
while (true) {
Thread.sleep(100);
//insert a random integer
int next = random.nextInt();
buffer.put(next);
System.err.println("b.size() increases to " + buffer.size());
}
}
catch (InterruptedException e) {}
}
}
/**
* An instance of the Consumer class consumes integers from a bounded
* buffer at random intervals.
*/
class Consumer2 extends Thread
{
// the buffer in which to insert new integers
BoundedBuffer2 buffer;
public Consumer2(BoundedBuffer2 buffer)
{
this.buffer = buffer;
}
public void run()
{
Random random = new Random();
try {
while (true) {
Thread.sleep(200);
//get the next integer from the buffer
int next = buffer.get();
System.err.println("next = " + next);
System.err.println("b.size() reducted to " + buffer.size());
}
}
catch (InterruptedException e) {}
}
}
public class UseBuffer2
{
public static void main(String [] args)
{
BoundedBuffer2 buffer = new BoundedBuffer2();
Producer2 p = new Producer2(buffer);
Consumer2 c = new Consumer2(buffer);
p.start();
c.start();
}
}
这是控制台的输出:
b.size() increases to 1
b.size() increases to 2
next = 400524264
b.size() reducted to 1
b.size() increases to 2
b.size() increases to 3
next = 241523118
b.size() reducted to 2
b.size() increases to 3
next = -1618289090
b.size() reducted to 2
b.size() increases to 3
b.size() increases to 4
next = -316455080
b.size() reducted to 3
b.size() increases to 4
b.size() increases to 5
next = 338682909
b.size() reducted to 4
b.size() increases to 5
b.size() increases to 6
next = -961276708
b.size() reducted to 5
b.size() increases to 6
b.size() increases to 7
next = 2056804692
b.size() reducted to 6
b.size() increases to 7
b.size() increases to 8
next = -301063524
b.size() reducted to 7
b.size() increases to 8
b.size() increases to 9
next = -148582342
b.size() reducted to 8
b.size() increases to 9
b.size() increases to 10
next = -2076430410
b.size() reducted to 9
b.size() increases to 10
您的 put()
和 get()
方法已同步。因此,如果生产者进入 put()
方法,试图获取 notFull
信号量,但不能,因为缓冲区已满,它会永远阻塞,保持进入同步方法时获取的锁。所以消费者无法进入get()方法,也无法从缓冲区中取出元素。
了解 synchronized 关键字的工作原理:https://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
相关部分:
it is not possible for two invocations of synchronized methods on the same object to interleave. When one thread is executing a synchronized method for an object, all other threads that invoke synchronized methods for the same object block (suspend execution) until the first thread is done with the object.