两把锁守护状态

Guarding state with two locks

作为 to the ,我想到了使用现有的阻塞结构blockingQueue2 并用两个不同的锁来保护状态。

public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 184661285402L;

    private Object lock1 = new Object();//used in pause() and in take()
    private Object lock2 = new Object();//used in pause() and unpause()

    //@GuardedBy("lock1")
    private volatile boolean paused;

    private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();

    public void pause() {
        if (!paused) {
            synchronized (lock1) {
            synchronized (lock2) {
                if (!paused) {
                    paused = true;
                    blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
                }
            }
            }
        }
    }

    public void unpause() throws InterruptedException {
        if (paused) {
            synchronized (lock2) {
                paused = false;
                blockingQueue2.put(new Object());//will release waiting thread, if there is one
            }
        }
    }

    @Override
    public E take() throws InterruptedException {
        E result = super.take();

        if (paused) {
            synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
                if (paused) {
                    blockingQueue2.take();
                }
            }
        }

        return result;
    }

    //TODO override similarly the poll() method.
}

我需要两个不同的锁,否则 unpause() 可能会等待消费者线程已经在 take() 中持有的 lock1

我的问题:

  1. 这会不会陷入僵局?
  2. 它真的有效吗?
  3. 你多久看到这样的代码,因为我自己觉得它不可读?
  4. 我应该如何注释 paused 标志:使用 @GuardedBy("lock1, locks2")

PS:欢迎任何改进(除此之外我可以使用二进制信号量而不是 blockingQueue2)。

我会一一回答你的问题

Could this come to a deadlock?

不,你不会导致死锁。如果您以不同的顺序获得 lock1lock2,则可能会导致死锁。因为你在同时持有它们时以相同的顺序获得它们,所以你应该没问题。

Does it work at all?

看来。所有 happens-before 订单似乎都已满足。

How often did you see such code, as I myself don't find it readable?

我以前从未见过这种实现方式。我同意这不是很优雅。


我将提出一个使用 Phaser 的替代解决方案。可以说这不再优雅,只是另一种方法。看了一段时间,觉得够用了。当然,我也从未见过这种方法,但想起来很有趣。

public static class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 184661285402L;

    private final Phaser phaser = new Phaser(1);
    private volatile int phase = phaser.getPhase();

    public BlockingQueueWithPause() {
        // base case, all phase 0 await's will succeed through.
        phaser.arrive();
    }

    public void pause() {
        phase = phaser.getPhase();
    }

    public void unpause() throws InterruptedException {
        phaser.arrive();
    }

    @Override
    public E take() throws InterruptedException {
        phaser.awaitAdvance(phase);

        E result = super.take();

        return result;
    }
}

我想我应该解释一下这个解决方案。 Phaser 就像 CylicBarrier and CountDownLatch 有一个 child。它允许 re-using 屏障而不等待屏障跳闸。

在基本情况下,共享 phase 将为 0。由于在构造函数中调用了 arrive,因此 phaser 的内部相位为 1。因此,如果 take 在没有调用 pause 的情况下被调用 awaitAdvance 将在 0 上被调用。由于内部相位为 1,移相器 fast-path 退出并且是一个简单的易失性负载( 0 阶段已经发生,所以我们不再需要等待前进)。

如果调用 pause,共享的 phase 变量将更新为移相器的内部相位,现在为 1。因此 takeawaitTermination 在 1导致它暂停。

unpause 到达将导致所有线程 awaitAdvanceing 释放并将移相器的内部相位增加到 2。同样,后续的 take 将 fast-path 退出而没有相应的暂停。