有效锁定资源,由字符串标识

Efficient locking on a resource, identified by a string

编辑:我更新了示例以使用 https://github.com/StephenCleary/AsyncEx 库。仍在等待可用的提示。

有资源,由字符串标识(例如文件、URL等)。我正在寻找资源的锁定机制。我找到了 2 种不同的解决方案,但每种都有其问题:

首先是使用 ConcurrentDictionary class 和 AsyncLock:

using Nito.AsyncEx;
using System.Collections.Concurrent;

internal static class Locking {
    private static ConcurrentDictionary<string, AsyncLock> mutexes
        = new ConcurrentDictionary<string, AsyncLock>();

    internal static AsyncLock GetMutex(string resourceLocator) {
        return mutexes.GetOrAdd(
            resourceLocator,
            key => new AsyncLock()
        );
    }
}

异步用法:

using (await Locking.GetMutex("resource_string").LockAsync()) {
    ...
}

同步用法:

using (Locking.GetMutex("resource_string").Lock()) {
    ...
}

这很安全,但问题是字典变得越来越大,而且当没有人在等待锁时,我没有看到从字典中删除项目的线程安全方法。 (我也想避免全局锁。)

我的第二个解决方案将字符串散列为 0N - 1 之间的数字,并锁定这些:

using Nito.AsyncEx;
using System.Collections.Concurrent;

internal static class Locking {
    private const UInt32 BUCKET_COUNT = 4096;

    private static ConcurrentDictionary<UInt32, AsyncLock> mutexes
        = new ConcurrentDictionary<UInt32, AsyncLock>();

    private static UInt32 HashStringToInt(string text) {
        return ((UInt32)text.GetHashCode()) % BUCKET_COUNT;
    }

    internal static AsyncLock GetMutex(string resourceLocator) {
        return mutexes.GetOrAdd(
            HashStringToInt(resourceLocator),
            key => new AsyncLock()
        );
    }
}

可以看出,第二种方案只是降低了碰撞的概率,并没有避免碰撞。我最担心的是它会导致死锁:避免死锁的主要策略是始终按特定顺序锁定项目。但是使用这种方法,不同的项目可以以不同的顺序映射到相同的桶,比如:(A->X,B->Y),(C->Y,D->X)。因此,使用此解决方案无法安全地锁定多个资源。

有更好的解决办法吗? (我也欢迎对以上两种解决方案提出批评。)

您可以通过在字典停止使用时从字典中删除锁来改进第一个解决方案。然后可以将删除的锁添加到一个小池中,这样下次您需要锁时,您只需从池中获取一个锁而不是创建一个新锁。


更新: 下面是这个想法的一个实现。它基于 SemaphoreSlims 而不是 Stephen Cleary 的 AsyncLocks,因为为了从字典中删除未使用的信号量,需要自定义一次性。

public class MultiLock<TKey>
{
    private object Locker { get; } = new object();
    private Dictionary<TKey, LockItem> Dictionary { get; }
    private Queue<LockItem> Pool { get; }
    private int PoolSize { get; }

    public MultiLock(int poolSize = 10)
    {
        Dictionary = new Dictionary<TKey, LockItem>();
        Pool = new Queue<LockItem>(poolSize);
        PoolSize = poolSize;
    }

    public WaitResult Wait(TKey key,
        int millisecondsTimeout = Timeout.Infinite,
        CancellationToken cancellationToken = default)
    {
        var lockItem = GetLockItem(key);
        bool acquired;
        try
        {
            acquired = lockItem.Semaphore.Wait(millisecondsTimeout,
                cancellationToken);
        }
        catch
        {
            ReleaseLockItem(lockItem, key);
            throw;
        }
        return new WaitResult(this, lockItem, key, acquired);
    }

    public async Task<WaitResult> WaitAsync(TKey key,
        int millisecondsTimeout = Timeout.Infinite,
        CancellationToken cancellationToken = default)
    {
        var lockItem = GetLockItem(key);
        bool acquired;
        try
        {
            acquired = await lockItem.Semaphore.WaitAsync(millisecondsTimeout,
                cancellationToken).ConfigureAwait(false);
        }
        catch
        {
            ReleaseLockItem(lockItem, key);
            throw;
        }
        return new WaitResult(this, lockItem, key, acquired);
    }

    private LockItem GetLockItem(TKey key)
    {
        LockItem lockItem;
        lock (Locker)
        {
            if (!Dictionary.TryGetValue(key, out lockItem))
            {
                if (Pool.Count > 0)
                {
                    lockItem = Pool.Dequeue();
                }
                else
                {
                    lockItem = new LockItem();
                }
                Dictionary.Add(key, lockItem);
            }
            lockItem.UsedCount += 1;
        }
        return lockItem;
    }

    private void ReleaseLockItem(LockItem lockItem, TKey key)
    {
        lock (Locker)
        {
            lockItem.UsedCount -= 1;
            if (lockItem.UsedCount == 0)
            {
                if (Dictionary.TryGetValue(key, out var stored))
                {
                    if (stored == lockItem) // Sanity check
                    {
                        Dictionary.Remove(key);
                        if (Pool.Count < PoolSize)
                        {
                            Pool.Enqueue(lockItem);
                        }
                    }
                }
            }
        }
    }

    internal class LockItem
    {
        public SemaphoreSlim Semaphore { get; } = new SemaphoreSlim(1);
        public int UsedCount { get; set; }
    }

    public struct WaitResult : IDisposable
    {
        private MultiLock<TKey> MultiLock { get; }
        private LockItem LockItem { get; }
        private TKey Key { get; }

        public bool LockAcquired { get; }

        internal WaitResult(MultiLock<TKey> multiLock, LockItem lockItem, TKey key,
            bool acquired)
        {
            MultiLock = multiLock;
            LockItem = lockItem;
            Key = key;
            LockAcquired = acquired;
        }

        void IDisposable.Dispose()
        {
            MultiLock.ReleaseLockItem(LockItem, Key);
            LockItem.Semaphore.Release();
        }
    }
}

用法示例:

var multiLock = new MultiLock<string>();
using (await multiLock.WaitAsync("SomeKey"))
{
    //...
}

未使用信号量的默认池大小为 10。最佳值应该是使用 MultiLock 实例的并发工作人员的数量。

我在自己的PC上做了性能测试,10个worker每秒总共可以异步获取锁50万次(使用了20种不同的字符串标识)