同步缓存项

Synchronizing on cached items

我正在使用类似

的东西
Cache<Integer, Item> cache;

其中 Item 彼此独立并且看起来像

private static class Item {
    private final int id;
    ... some mutable data

    synchronized doSomething() {...}
    synchronized doSomethingElse() {...}
}

想法是从缓存中获取项目并在其上调用同步方法。万一遗漏了,可以重新创建项目,没关系。

在线程运行同步方法时,当项目从缓存中逐出并重新创建时会出现问题。一个新线程获取一个新项目并对其进行同步...因此对于单个 id,同步方法中有两个线程。失败。

有没有简单的解决方法?如果有帮助,就Guava Cache

我确信您的问题有多种解决方案。 我通过为每个 ietmId 使用唯一锁记下了其中一个:

public class LockManager {

private Map<Integer, Lock> lockMap = new ConcurrentHashMap<>();

public synchronized Lock getOrCreateLockForId(Integer itemId) {
    Lock lock;
    if (lockMap.containsKey(itemId)) {
        System.out.println("Get lock");
        lock = lockMap.get(itemId);
    } else {
        System.out.println("Create lock");
        lock = new ReentrantLock();
        lockMap.put(itemId, lock);
    }
    return lock;
}

public synchronized Lock getLockForId(Integer itemId) {
    Lock lock;
    if (lockMap.containsKey(itemId)) {
        System.out.println("get lock");
        return lockMap.get(itemId);
    } else {
        throw new IllegalStateException("First lock, than unlock");
    }
}

}

因此,不是在 class 项目中使用 synchronised 方法,而是使用 LockManager 通过 itemId 获取 Lock 并在检索后调用 lock.lock()。 另请注意,LockManager 应具有单例范围,并且应在所有用途之间共享同一实例。

您可以在下面看到 LockManager 使用的示例:

            try {
            lockManager.getOrCreateLockForId(itemId).lock();
            System.out.println("start doing something" + num);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("completed doing something" + num);
        } finally {
            lockManager.getLockForId(itemId).unlock();
        }

我觉得Louis的建议,用钥匙上锁是最简单实用的。这是一些代码片段,没有 Guava 库的帮助,说明了这个想法:

static locks[] = new Lock[ ... ];
static { /* initialize lock array */ } 
int id;
void doSomething() {
  final lock = locks[id % locks.length];
  lock.lock();
  try {
    /* protected code */
  } finally {
    lock.unlock();
  } 
}

锁数组的大小限制了您获得的最大并行度。如果您的代码仅使用 CPU,您可以通过可用处理器的数量对其进行初始化,这是完美的解决方案。如果您的代码等待 I/O,您可能需要一个任意大的锁数组,或者您限制可以 运行 临界区的线程数。在这种情况下,另一种方法可能更好。

更概念层面的评论:

如果你想防止物品被逐出,你需要一个叫做pinning的机制。在内部,这被大多数缓存实现使用,例如用于在 I/O 操作期间进行阻塞。某些缓存可能会公开应用程序执行此操作的方法。

在兼容JCache 的缓存中,有一个EntryProcessor 的概念。 EntryProcessor 允许您以原子方式处理条目上的少量代码。这意味着缓存正在为你做所有的锁定。根据问题的范围,这可能有优势,因为这也适用于集群场景,这意味着锁定是集群范围的。

我想到的另一个想法是可否决驱逐。这是 EHCache 3 正在实现的概念。通过指定可否决驱逐政策,您可以自行实施固定机制。