自定义数组阻塞队列

Custom Array Blocking Queue

我正在尝试实现阻塞队列功能,但线程进入等待状态。无法弄清楚可能出了什么问题。我在线尝试了一些实现,但 none 正在运行。也许我的执行者代码是错误的。但是如果我用 ArrayBlockingQueue 替换 MyBlockingQueue 一切正常。

以下是两种主要方法。

public synchronized void put(Integer i) throws InterruptedException {

    if (a.size() == capacity) {
        wait();
    }
    a.add(i);
    notifyAll();
}

public synchronized void take() throws InterruptedException {

    if (a.isEmpty()) {
        wait();
    }
    a.remove(0);
    notifyAll();
}

代码:

public class App {

    public static MyBlockingQueue q = new MyBlockingQueue(10);

    // public static ArrayBlockingQueue q = new ArrayBlockingQueue(10);

    public void method1() throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            q.put(i);
            System.out.println(q);
        }
    }

    public void method2() throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            q.take();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        App a = new App();

        ExecutorService executor1 = Executors.newFixedThreadPool(20);
        ExecutorService executor2 = Executors.newFixedThreadPool(20);

        for (int i = 0; i < 2; i++) {

            executor1.submit(new Runnable() {

                public void run() {

                    try {
                        a.method1();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }

        for (int i = 0; i < 2; i++) {

            executor2.submit(new Runnable() {

                public void run() {

                    try {
                        a.method2();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }

        executor1.shutdown();
        executor2.shutdown();

        executor1.awaitTermination(1, TimeUnit.DAYS);
        executor2.awaitTermination(2, TimeUnit.DAYS);

        System.out.println();
        System.out.println("The final queue is:");
        System.out.println(App.q);

    }

}

class MyBlockingQueue {

private ArrayList<Integer> a;
    private int capacity;

    public MyBlockingQueue(Integer cap) {
        capacity = cap;
        a = new ArrayList<Integer>(capacity);
    }

    @Override
    public String toString() {
        String output = "";
        for (Integer i : a) {
            output += i.toString() + " ";
        }
        return "[" + output + "]";
    }

    public synchronized void put(Integer i) throws InterruptedException {

        if (a.size() == capacity) {
            wait();
        }
        a.add(i);
        notifyAll();
    }

    public synchronized void take() throws InterruptedException {

        if (a.isEmpty()) {
            wait();
        }
        a.remove(0);
        notifyAll();
    }
}

方法 put 和 take in ArrayBlockingQueue 应该与 java.util.concurrent.locks.ReentrantLock 同步。

示例:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // your logic 
        } finally {
            lock.unlock();
        }
    }

下面是坐在角落里的害羞的罪魁祸首,还需要将 if 固定为 while

@Override
public String toString() {
    String output = "";
    for (Integer i : a) {
        output += i.toString() + " ";
    }
    return "[" + output + "]";
}

我正在向解决方案添加 toString 方法,但它们不起作用。

让它 synchronized 一切正常。感谢@Maurice 提出这个建议。

问题是所有线程都在同一个监视器上等待,会被notifyAll和运行同时唤醒。

您可以使用方法notify唤醒单线程并进行单插入或删除。但是在 put() 内部调用的 notify() 可以唤醒等待在 put() 方法中插入 not full 条件的线程:

while (a.size() == capacity) {
        wait(); // many threads waiting
    }

并且在take()内部调用的notify()可以唤醒等待not empty条件的线程:

while (a.isEmpty()) {
        wait();
    }

因为所有线程都使用一个监视器,notify可以唤醒任何等待的线程。

因此您需要两台显示器:一台用于 not full 条件,一台用于 not empty

    Object notFull = new Object();
    Object notEmpty = new Object();

    public synchronized void put(Integer i) throws InterruptedException {

        while (a.size() == capacity) {
            notFull.wait(); 
        }
        a.add(i);
        notEmpty.notify(); //wake up one random thread in take() method
    }

    public synchronized void take() throws InterruptedException {

        if (a.isEmpty()) {
            notEmpty.wait();
        }
        a.remove(0);
        notFull.notify(); // wake up one random thread in put() method
    }

现在 notEmpty.notify()notFull.notify() 不会释放 puttake 方法中 synchronized 关键字获取的锁。

我们需要在同一个锁上同步这两个方法,并根据两个条件释放或获取它:未满和非空。为此有 java.util.concurrent.locks.ReentrantLock class:

A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.

这个class代表一个有条件的锁。它的方法[newCondition][2]创建条件:

Returns a Condition instance for use with this Lock instance.

Condition 允许通过 await() 方法挂起多个线程并通过 signal() 方法唤醒单个等待线程。没有 ReentrantLock 就不能使用。 当调用 Condition.await 时线程释放在 ReentrantLock.lock 方法中获取的锁。当调用 Condition.signal 时,等待线程获取 ReentrantLock。 最终代码:

/** Main lock guarding all access */
       private final ReentrantLock lock;
       /** Condition for waiting takes */
       private final Condition notEmpty;
       /** Condition for waiting puts */
       private final Condition notFull;

 public void put(Integer i) throws InterruptedException {
          lock.lockInterruptibly();
          try {
                  while (a.size() == capacity)
                      notFull.await();//realease monitor and sleep until notFull.signal is called

             a.add(i);
            notEmpty.signal();// wake up one random thread in take() method
          } finally {
              lock.unlock();
          }
 }

public void take() throws InterruptedException {
          lock.lockInterruptibly();
          try {
                   while (a.isEmpty())
                      notEmpty.await();//realease monitor and sleep until notEmpty.signal is called

                   a.remove(0);
             notFull.signal();// wake up one random thread in put() method
          } finally {
              lock.unlock();
          }       
}

ReentrantLock确保只有一个线程可以同时执行puttake方法。 Condition 允许根据条件挂起和恢复线程。