两把锁守护状态
Guarding state with two locks
作为 to the ,我想到了使用现有的阻塞结构blockingQueue2
并用两个不同的锁来保护状态。
public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 184661285402L;
private Object lock1 = new Object();//used in pause() and in take()
private Object lock2 = new Object();//used in pause() and unpause()
//@GuardedBy("lock1")
private volatile boolean paused;
private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();
public void pause() {
if (!paused) {
synchronized (lock1) {
synchronized (lock2) {
if (!paused) {
paused = true;
blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
}
}
}
}
}
public void unpause() throws InterruptedException {
if (paused) {
synchronized (lock2) {
paused = false;
blockingQueue2.put(new Object());//will release waiting thread, if there is one
}
}
}
@Override
public E take() throws InterruptedException {
E result = super.take();
if (paused) {
synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
if (paused) {
blockingQueue2.take();
}
}
}
return result;
}
//TODO override similarly the poll() method.
}
我需要两个不同的锁,否则 unpause()
可能会等待消费者线程已经在 take()
中持有的 lock1
。
我的问题:
- 这会不会陷入僵局?
- 它真的有效吗?
- 你多久看到这样的代码,因为我自己觉得它不可读?
- 我应该如何注释
paused
标志:使用 @GuardedBy("lock1, locks2")
?
PS:欢迎任何改进(除此之外我可以使用二进制信号量而不是 blockingQueue2
)。
我会一一回答你的问题
Could this come to a deadlock?
不,你不会导致死锁。如果您以不同的顺序获得 lock1
和 lock2
,则可能会导致死锁。因为你在同时持有它们时以相同的顺序获得它们,所以你应该没问题。
Does it work at all?
看来。所有 happens-before 订单似乎都已满足。
How often did you see such code, as I myself don't find it readable?
我以前从未见过这种实现方式。我同意这不是很优雅。
我将提出一个使用 Phaser 的替代解决方案。可以说这不再优雅,只是另一种方法。看了一段时间,觉得够用了。当然,我也从未见过这种方法,但想起来很有趣。
public static class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 184661285402L;
private final Phaser phaser = new Phaser(1);
private volatile int phase = phaser.getPhase();
public BlockingQueueWithPause() {
// base case, all phase 0 await's will succeed through.
phaser.arrive();
}
public void pause() {
phase = phaser.getPhase();
}
public void unpause() throws InterruptedException {
phaser.arrive();
}
@Override
public E take() throws InterruptedException {
phaser.awaitAdvance(phase);
E result = super.take();
return result;
}
}
我想我应该解释一下这个解决方案。 Phaser 就像 CylicBarrier and CountDownLatch 有一个 child。它允许 re-using 屏障而不等待屏障跳闸。
在基本情况下,共享 phase
将为 0。由于在构造函数中调用了 arrive
,因此 phaser
的内部相位为 1。因此,如果 take
在没有调用 pause
的情况下被调用 awaitAdvance
将在 0 上被调用。由于内部相位为 1,移相器 fast-path 退出并且是一个简单的易失性负载( 0 阶段已经发生,所以我们不再需要等待前进)。
如果调用 pause
,共享的 phase
变量将更新为移相器的内部相位,现在为 1。因此 take
将 awaitTermination
在 1导致它暂停。
unpause
到达将导致所有线程 awaitAdvance
ing 释放并将移相器的内部相位增加到 2。同样,后续的 take 将 fast-path 退出而没有相应的暂停。
作为blockingQueue2
并用两个不同的锁来保护状态。
public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 184661285402L;
private Object lock1 = new Object();//used in pause() and in take()
private Object lock2 = new Object();//used in pause() and unpause()
//@GuardedBy("lock1")
private volatile boolean paused;
private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();
public void pause() {
if (!paused) {
synchronized (lock1) {
synchronized (lock2) {
if (!paused) {
paused = true;
blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
}
}
}
}
}
public void unpause() throws InterruptedException {
if (paused) {
synchronized (lock2) {
paused = false;
blockingQueue2.put(new Object());//will release waiting thread, if there is one
}
}
}
@Override
public E take() throws InterruptedException {
E result = super.take();
if (paused) {
synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
if (paused) {
blockingQueue2.take();
}
}
}
return result;
}
//TODO override similarly the poll() method.
}
我需要两个不同的锁,否则 unpause()
可能会等待消费者线程已经在 take()
中持有的 lock1
。
我的问题:
- 这会不会陷入僵局?
- 它真的有效吗?
- 你多久看到这样的代码,因为我自己觉得它不可读?
- 我应该如何注释
paused
标志:使用@GuardedBy("lock1, locks2")
?
PS:欢迎任何改进(除此之外我可以使用二进制信号量而不是 blockingQueue2
)。
我会一一回答你的问题
Could this come to a deadlock?
不,你不会导致死锁。如果您以不同的顺序获得 lock1
和 lock2
,则可能会导致死锁。因为你在同时持有它们时以相同的顺序获得它们,所以你应该没问题。
Does it work at all?
看来。所有 happens-before 订单似乎都已满足。
How often did you see such code, as I myself don't find it readable?
我以前从未见过这种实现方式。我同意这不是很优雅。
我将提出一个使用 Phaser 的替代解决方案。可以说这不再优雅,只是另一种方法。看了一段时间,觉得够用了。当然,我也从未见过这种方法,但想起来很有趣。
public static class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 184661285402L;
private final Phaser phaser = new Phaser(1);
private volatile int phase = phaser.getPhase();
public BlockingQueueWithPause() {
// base case, all phase 0 await's will succeed through.
phaser.arrive();
}
public void pause() {
phase = phaser.getPhase();
}
public void unpause() throws InterruptedException {
phaser.arrive();
}
@Override
public E take() throws InterruptedException {
phaser.awaitAdvance(phase);
E result = super.take();
return result;
}
}
我想我应该解释一下这个解决方案。 Phaser 就像 CylicBarrier and CountDownLatch 有一个 child。它允许 re-using 屏障而不等待屏障跳闸。
在基本情况下,共享 phase
将为 0。由于在构造函数中调用了 arrive
,因此 phaser
的内部相位为 1。因此,如果 take
在没有调用 pause
的情况下被调用 awaitAdvance
将在 0 上被调用。由于内部相位为 1,移相器 fast-path 退出并且是一个简单的易失性负载( 0 阶段已经发生,所以我们不再需要等待前进)。
如果调用 pause
,共享的 phase
变量将更新为移相器的内部相位,现在为 1。因此 take
将 awaitTermination
在 1导致它暂停。
unpause
到达将导致所有线程 awaitAdvance
ing 释放并将移相器的内部相位增加到 2。同样,后续的 take 将 fast-path 退出而没有相应的暂停。