基于资源 ID 的锁定管理器中的竞争条件
Race condition in Resource ID based locking manager
我正在尝试编写一个将从多个线程调用的锁定管理器。该管理器根据各种资源 ID 处理锁定。这些可能会有很大差异,因此为每个保留内存锁可能会导致大量内存使用。这就是为什么一个锁不再被使用后(使用它的线程数达到0),它会被从内存中移除。
它可以根据请求的资源ID独占锁定线程(如果两个线程锁定同一个ID,一个将等待另一个解锁),或者使用ReentrantReadWriteLock完全排除所有其他线程。
我遇到了一种竞争情况,当锁被最后一个持有它的线程解锁时,锁从内存中移除,但其他线程仍然试图解锁它?这会导致我无法解释的 NPE。
我试过使用 AtomicInteger 而不是当前的 volatile 变量,认为它可能与此有关,但结果相似。
这是有问题的 class:
/**
* This class provides locks for reading and writing, and bulk operations lock on the entire class.
*
* If a bulk operation is not in progress, class based locking is transparent.
* @author KiralyCraft
*
*/
public class ReadWriteHighLevelLocking
{
private class Semaphore
{
private ReentrantLock lock;
private volatile int acquiredLocks;
public Semaphore()
{
this.acquiredLocks = 0;
this.lock = new ReentrantLock();
}
public synchronized int incrementAndGet()
{
return ++acquiredLocks;
}
public synchronized int decrementAndGet()
{
return --acquiredLocks;
}
}
private ReentrantReadWriteLock classBasedLock;
private volatile HashMap<String, Semaphore> stateChangeLocks;
public ReadWriteHighLevelLocking()
{
this.stateChangeLocks = new HashMap<String,Semaphore>();
this.classBasedLock = new ReentrantReadWriteLock();
}
/**
* Acquires a lock for the specified resource ID.
*
* May block if another thread is currently holding a bulk lock.
* @param resourceID
*/
public void acquireLock(String resourceID)
{
classBasedLock.readLock().lock(); //Using it reversed. There can be any number of operations (using the read lock), but only one bulk operation (sacrifice)
Semaphore stateChangeLock;
synchronized(stateChangeLocks)
{
if ((stateChangeLock = stateChangeLocks.get(resourceID))==null)
{
stateChangeLocks.put(resourceID, (stateChangeLock = new Semaphore()));
}
}
stateChangeLock.lock.lock();
stateChangeLock.incrementAndGet();
}
public void releaseLock(String resourceID)
{
Semaphore stateChangeLock;
synchronized(stateChangeLocks)
{
stateChangeLock = stateChangeLocks.get(resourceID);
if (stateChangeLock.decrementAndGet() == 0) //<----------------- HERE IS THE NPE
{
stateChangeLocks.remove(resourceID);
}
}
stateChangeLock.lock.unlock();
classBasedLock.readLock().unlock();
}
/**
* When a bulk lock is acquired, all other operations are delayed until this one is released.
*/
public void acquireBulkLock()
{
classBasedLock.writeLock().lock(); //Using it reversed. There can be any number of writers (using the read lock), but only one reader (sacrifice)
}
public void releaseBulkLock()
{
classBasedLock.writeLock().unlock();
}
}
调用者示例 class:
public abstract class AbstractDatabaseLockingController
{
...
private ReadWriteHighLevelLocking highLevelLock;
public AbstractDatabaseLockingController(DatabaseInterface db)
{
this.db = db;
this.highLevelLock = new ReadWriteHighLevelLocking();
}
...
public <T extends DatabaseIdentifiable> boolean executeSynchronizedUpdate(T theEntity,AbstractSynchronousOperation<T> aso)
{
boolean toReturn;
String lockID = theEntity.getId()+theEntity.getClass().getSimpleName();
highLevelLock.acquireLock(lockID);
toReturn = aso.execute(theEntity,db);
highLevelLock.releaseLock(lockID);
return toReturn;
}
...
public <T extends DatabaseIdentifiable> List<T> executeSynchronizedGetAllWhereFetch(Class<T> objectType, DatabaseQuerySupplier<T> dqs)
{
List<T> toReturn;
highLevelLock.acquireBulkLock();
toReturn = db.getAllWhere(objectType, dqs);
highLevelLock.releaseBulkLock();
return toReturn;
}
}
注意:所有使用这种锁定管理器的地方都遵循示例 class 中的 acquire/release 模式。它基本上是唯一使用它的地方。其他线程可能会通过样本class的children
间接调用上述方法
我似乎已经通过更新以下代码解决了这个问题:
synchronized(stateChangeLocks)
{
if ((stateChangeLock = stateChangeLocks.get(resourceID))==null)
{
stateChangeLocks.put(resourceID, (stateChangeLock = new Semaphore()));
}
}
stateChangeLock.lock.lock();
stateChangeLock.incrementAndGet();
至
synchronized(stateChangeLocks)
{
if ((stateChangeLock = stateChangeLocks.get(resourceID))==null)
{
stateChangeLocks.put(resourceID, (stateChangeLock = new Semaphore()));
}
stateChangeLock.incrementAndGet();
}
stateChangeLock.lock.lock();
我正在尝试编写一个将从多个线程调用的锁定管理器。该管理器根据各种资源 ID 处理锁定。这些可能会有很大差异,因此为每个保留内存锁可能会导致大量内存使用。这就是为什么一个锁不再被使用后(使用它的线程数达到0),它会被从内存中移除。
它可以根据请求的资源ID独占锁定线程(如果两个线程锁定同一个ID,一个将等待另一个解锁),或者使用ReentrantReadWriteLock完全排除所有其他线程。
我遇到了一种竞争情况,当锁被最后一个持有它的线程解锁时,锁从内存中移除,但其他线程仍然试图解锁它?这会导致我无法解释的 NPE。
我试过使用 AtomicInteger 而不是当前的 volatile 变量,认为它可能与此有关,但结果相似。
这是有问题的 class:
/**
* This class provides locks for reading and writing, and bulk operations lock on the entire class.
*
* If a bulk operation is not in progress, class based locking is transparent.
* @author KiralyCraft
*
*/
public class ReadWriteHighLevelLocking
{
private class Semaphore
{
private ReentrantLock lock;
private volatile int acquiredLocks;
public Semaphore()
{
this.acquiredLocks = 0;
this.lock = new ReentrantLock();
}
public synchronized int incrementAndGet()
{
return ++acquiredLocks;
}
public synchronized int decrementAndGet()
{
return --acquiredLocks;
}
}
private ReentrantReadWriteLock classBasedLock;
private volatile HashMap<String, Semaphore> stateChangeLocks;
public ReadWriteHighLevelLocking()
{
this.stateChangeLocks = new HashMap<String,Semaphore>();
this.classBasedLock = new ReentrantReadWriteLock();
}
/**
* Acquires a lock for the specified resource ID.
*
* May block if another thread is currently holding a bulk lock.
* @param resourceID
*/
public void acquireLock(String resourceID)
{
classBasedLock.readLock().lock(); //Using it reversed. There can be any number of operations (using the read lock), but only one bulk operation (sacrifice)
Semaphore stateChangeLock;
synchronized(stateChangeLocks)
{
if ((stateChangeLock = stateChangeLocks.get(resourceID))==null)
{
stateChangeLocks.put(resourceID, (stateChangeLock = new Semaphore()));
}
}
stateChangeLock.lock.lock();
stateChangeLock.incrementAndGet();
}
public void releaseLock(String resourceID)
{
Semaphore stateChangeLock;
synchronized(stateChangeLocks)
{
stateChangeLock = stateChangeLocks.get(resourceID);
if (stateChangeLock.decrementAndGet() == 0) //<----------------- HERE IS THE NPE
{
stateChangeLocks.remove(resourceID);
}
}
stateChangeLock.lock.unlock();
classBasedLock.readLock().unlock();
}
/**
* When a bulk lock is acquired, all other operations are delayed until this one is released.
*/
public void acquireBulkLock()
{
classBasedLock.writeLock().lock(); //Using it reversed. There can be any number of writers (using the read lock), but only one reader (sacrifice)
}
public void releaseBulkLock()
{
classBasedLock.writeLock().unlock();
}
}
调用者示例 class:
public abstract class AbstractDatabaseLockingController
{
...
private ReadWriteHighLevelLocking highLevelLock;
public AbstractDatabaseLockingController(DatabaseInterface db)
{
this.db = db;
this.highLevelLock = new ReadWriteHighLevelLocking();
}
...
public <T extends DatabaseIdentifiable> boolean executeSynchronizedUpdate(T theEntity,AbstractSynchronousOperation<T> aso)
{
boolean toReturn;
String lockID = theEntity.getId()+theEntity.getClass().getSimpleName();
highLevelLock.acquireLock(lockID);
toReturn = aso.execute(theEntity,db);
highLevelLock.releaseLock(lockID);
return toReturn;
}
...
public <T extends DatabaseIdentifiable> List<T> executeSynchronizedGetAllWhereFetch(Class<T> objectType, DatabaseQuerySupplier<T> dqs)
{
List<T> toReturn;
highLevelLock.acquireBulkLock();
toReturn = db.getAllWhere(objectType, dqs);
highLevelLock.releaseBulkLock();
return toReturn;
}
}
注意:所有使用这种锁定管理器的地方都遵循示例 class 中的 acquire/release 模式。它基本上是唯一使用它的地方。其他线程可能会通过样本class的children
间接调用上述方法我似乎已经通过更新以下代码解决了这个问题:
synchronized(stateChangeLocks)
{
if ((stateChangeLock = stateChangeLocks.get(resourceID))==null)
{
stateChangeLocks.put(resourceID, (stateChangeLock = new Semaphore()));
}
}
stateChangeLock.lock.lock();
stateChangeLock.incrementAndGet();
至
synchronized(stateChangeLocks)
{
if ((stateChangeLock = stateChangeLocks.get(resourceID))==null)
{
stateChangeLocks.put(resourceID, (stateChangeLock = new Semaphore()));
}
stateChangeLock.incrementAndGet();
}
stateChangeLock.lock.lock();