自定义数组阻塞队列
Custom Array Blocking Queue
我正在尝试实现阻塞队列功能,但线程进入等待状态。无法弄清楚可能出了什么问题。我在线尝试了一些实现,但 none 正在运行。也许我的执行者代码是错误的。但是如果我用 ArrayBlockingQueue
替换 MyBlockingQueue
一切正常。
以下是两种主要方法。
public synchronized void put(Integer i) throws InterruptedException {
if (a.size() == capacity) {
wait();
}
a.add(i);
notifyAll();
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
wait();
}
a.remove(0);
notifyAll();
}
代码:
public class App {
public static MyBlockingQueue q = new MyBlockingQueue(10);
// public static ArrayBlockingQueue q = new ArrayBlockingQueue(10);
public void method1() throws InterruptedException {
for (int i = 0; i < 20; i++) {
q.put(i);
System.out.println(q);
}
}
public void method2() throws InterruptedException {
for (int i = 0; i < 20; i++) {
q.take();
}
}
public static void main(String[] args) throws InterruptedException {
App a = new App();
ExecutorService executor1 = Executors.newFixedThreadPool(20);
ExecutorService executor2 = Executors.newFixedThreadPool(20);
for (int i = 0; i < 2; i++) {
executor1.submit(new Runnable() {
public void run() {
try {
a.method1();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
for (int i = 0; i < 2; i++) {
executor2.submit(new Runnable() {
public void run() {
try {
a.method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
executor1.shutdown();
executor2.shutdown();
executor1.awaitTermination(1, TimeUnit.DAYS);
executor2.awaitTermination(2, TimeUnit.DAYS);
System.out.println();
System.out.println("The final queue is:");
System.out.println(App.q);
}
}
class MyBlockingQueue {
private ArrayList<Integer> a;
private int capacity;
public MyBlockingQueue(Integer cap) {
capacity = cap;
a = new ArrayList<Integer>(capacity);
}
@Override
public String toString() {
String output = "";
for (Integer i : a) {
output += i.toString() + " ";
}
return "[" + output + "]";
}
public synchronized void put(Integer i) throws InterruptedException {
if (a.size() == capacity) {
wait();
}
a.add(i);
notifyAll();
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
wait();
}
a.remove(0);
notifyAll();
}
}
方法 put 和 take in ArrayBlockingQueue
应该与 java.util.concurrent.locks.ReentrantLock
同步。
示例:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// your logic
} finally {
lock.unlock();
}
}
下面是坐在角落里的害羞的罪魁祸首,还需要将 if
固定为 while
。
@Override
public String toString() {
String output = "";
for (Integer i : a) {
output += i.toString() + " ";
}
return "[" + output + "]";
}
我正在向解决方案添加 toString 方法,但它们不起作用。
让它 synchronized
一切正常。感谢@Maurice 提出这个建议。
问题是所有线程都在同一个监视器上等待,会被notifyAll和运行同时唤醒。
您可以使用方法notify
唤醒单线程并进行单插入或删除。但是在 put()
内部调用的 notify()
可以唤醒等待在 put()
方法中插入 not full
条件的线程:
while (a.size() == capacity) {
wait(); // many threads waiting
}
并且在take()
内部调用的notify()
可以唤醒等待not empty
条件的线程:
while (a.isEmpty()) {
wait();
}
因为所有线程都使用一个监视器,notify
可以唤醒任何等待的线程。
因此您需要两台显示器:一台用于 not full
条件,一台用于 not empty
。
Object notFull = new Object();
Object notEmpty = new Object();
public synchronized void put(Integer i) throws InterruptedException {
while (a.size() == capacity) {
notFull.wait();
}
a.add(i);
notEmpty.notify(); //wake up one random thread in take() method
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
notEmpty.wait();
}
a.remove(0);
notFull.notify(); // wake up one random thread in put() method
}
现在 notEmpty.notify()
和 notFull.notify()
不会释放 put
或 take
方法中 synchronized
关键字获取的锁。
我们需要在同一个锁上同步这两个方法,并根据两个条件释放或获取它:未满和非空。为此有 java.util.concurrent.locks.ReentrantLock
class:
A reentrant mutual exclusion Lock with the same basic behavior and
semantics as the implicit monitor lock accessed using synchronized
methods and statements, but with extended capabilities.
这个class代表一个有条件的锁。它的方法[newCondition][2]
创建条件:
Returns a Condition instance for use with this Lock instance.
Condition 允许通过 await()
方法挂起多个线程并通过 signal()
方法唤醒单个等待线程。没有 ReentrantLock 就不能使用。
当调用 Condition.await
时线程释放在 ReentrantLock.lock
方法中获取的锁。当调用 Condition.signal
时,等待线程获取 ReentrantLock
。
最终代码:
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public void put(Integer i) throws InterruptedException {
lock.lockInterruptibly();
try {
while (a.size() == capacity)
notFull.await();//realease monitor and sleep until notFull.signal is called
a.add(i);
notEmpty.signal();// wake up one random thread in take() method
} finally {
lock.unlock();
}
}
public void take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (a.isEmpty())
notEmpty.await();//realease monitor and sleep until notEmpty.signal is called
a.remove(0);
notFull.signal();// wake up one random thread in put() method
} finally {
lock.unlock();
}
}
ReentrantLock
确保只有一个线程可以同时执行put
或take
方法。 Condition
允许根据条件挂起和恢复线程。
我正在尝试实现阻塞队列功能,但线程进入等待状态。无法弄清楚可能出了什么问题。我在线尝试了一些实现,但 none 正在运行。也许我的执行者代码是错误的。但是如果我用 ArrayBlockingQueue
替换 MyBlockingQueue
一切正常。
以下是两种主要方法。
public synchronized void put(Integer i) throws InterruptedException {
if (a.size() == capacity) {
wait();
}
a.add(i);
notifyAll();
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
wait();
}
a.remove(0);
notifyAll();
}
代码:
public class App {
public static MyBlockingQueue q = new MyBlockingQueue(10);
// public static ArrayBlockingQueue q = new ArrayBlockingQueue(10);
public void method1() throws InterruptedException {
for (int i = 0; i < 20; i++) {
q.put(i);
System.out.println(q);
}
}
public void method2() throws InterruptedException {
for (int i = 0; i < 20; i++) {
q.take();
}
}
public static void main(String[] args) throws InterruptedException {
App a = new App();
ExecutorService executor1 = Executors.newFixedThreadPool(20);
ExecutorService executor2 = Executors.newFixedThreadPool(20);
for (int i = 0; i < 2; i++) {
executor1.submit(new Runnable() {
public void run() {
try {
a.method1();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
for (int i = 0; i < 2; i++) {
executor2.submit(new Runnable() {
public void run() {
try {
a.method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
executor1.shutdown();
executor2.shutdown();
executor1.awaitTermination(1, TimeUnit.DAYS);
executor2.awaitTermination(2, TimeUnit.DAYS);
System.out.println();
System.out.println("The final queue is:");
System.out.println(App.q);
}
}
class MyBlockingQueue {
private ArrayList<Integer> a;
private int capacity;
public MyBlockingQueue(Integer cap) {
capacity = cap;
a = new ArrayList<Integer>(capacity);
}
@Override
public String toString() {
String output = "";
for (Integer i : a) {
output += i.toString() + " ";
}
return "[" + output + "]";
}
public synchronized void put(Integer i) throws InterruptedException {
if (a.size() == capacity) {
wait();
}
a.add(i);
notifyAll();
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
wait();
}
a.remove(0);
notifyAll();
}
}
方法 put 和 take in ArrayBlockingQueue
应该与 java.util.concurrent.locks.ReentrantLock
同步。
示例:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// your logic
} finally {
lock.unlock();
}
}
下面是坐在角落里的害羞的罪魁祸首,还需要将 if
固定为 while
。
@Override
public String toString() {
String output = "";
for (Integer i : a) {
output += i.toString() + " ";
}
return "[" + output + "]";
}
我正在向解决方案添加 toString 方法,但它们不起作用。
让它 synchronized
一切正常。感谢@Maurice 提出这个建议。
问题是所有线程都在同一个监视器上等待,会被notifyAll和运行同时唤醒。
您可以使用方法notify
唤醒单线程并进行单插入或删除。但是在 put()
内部调用的 notify()
可以唤醒等待在 put()
方法中插入 not full
条件的线程:
while (a.size() == capacity) {
wait(); // many threads waiting
}
并且在take()
内部调用的notify()
可以唤醒等待not empty
条件的线程:
while (a.isEmpty()) {
wait();
}
因为所有线程都使用一个监视器,notify
可以唤醒任何等待的线程。
因此您需要两台显示器:一台用于 not full
条件,一台用于 not empty
。
Object notFull = new Object();
Object notEmpty = new Object();
public synchronized void put(Integer i) throws InterruptedException {
while (a.size() == capacity) {
notFull.wait();
}
a.add(i);
notEmpty.notify(); //wake up one random thread in take() method
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
notEmpty.wait();
}
a.remove(0);
notFull.notify(); // wake up one random thread in put() method
}
现在 notEmpty.notify()
和 notFull.notify()
不会释放 put
或 take
方法中 synchronized
关键字获取的锁。
我们需要在同一个锁上同步这两个方法,并根据两个条件释放或获取它:未满和非空。为此有 java.util.concurrent.locks.ReentrantLock
class:
A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.
这个class代表一个有条件的锁。它的方法[newCondition][2]
创建条件:
Returns a Condition instance for use with this Lock instance.
Condition 允许通过 await()
方法挂起多个线程并通过 signal()
方法唤醒单个等待线程。没有 ReentrantLock 就不能使用。
当调用 Condition.await
时线程释放在 ReentrantLock.lock
方法中获取的锁。当调用 Condition.signal
时,等待线程获取 ReentrantLock
。
最终代码:
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public void put(Integer i) throws InterruptedException {
lock.lockInterruptibly();
try {
while (a.size() == capacity)
notFull.await();//realease monitor and sleep until notFull.signal is called
a.add(i);
notEmpty.signal();// wake up one random thread in take() method
} finally {
lock.unlock();
}
}
public void take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (a.isEmpty())
notEmpty.await();//realease monitor and sleep until notEmpty.signal is called
a.remove(0);
notFull.signal();// wake up one random thread in put() method
} finally {
lock.unlock();
}
}
ReentrantLock
确保只有一个线程可以同时执行put
或take
方法。 Condition
允许根据条件挂起和恢复线程。