如何在 .NET Core 3.1 中使用命名互斥锁和 async/await?
How to use named mutexes and async/await in .NET Core 3.1?
我正在为我的 ASP.NET Core 3.1 Web API.
实现一个缓存层
开始实施
public interface ICache
{
T Get<T>(string key);
void Set<T>(string key, T value);
}
public static class ICacheExtensions
{
public static T GetOrCreate<T>(this ICache cache, string key, Func<T> factory)
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = factory();
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
return value;
}
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
return value;
}
}
这工作正常,但我试图解决的一个已知问题是它容易受到缓存踩踏的影响。如果我的 API 正在处理许多请求,这些请求都试图同时使用 GetOrCreate 方法之一访问相同的键,那么它们将每个 运行 一个工厂函数的并行实例。这意味着冗余工作和资源浪费。
我试图做的是引入互斥锁以确保每个缓存键只有一个工厂函数实例可以 运行。
引入互斥体
public interface ICache
{
T Get<T>(string key);
void Set<T>(string key, T value);
}
public static class ICacheExtensions
{
public static T GetOrCreate<T>(this ICache cache, string key, Func<T> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
mutex.WaitOne();
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = factory();
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
}
finally
{
mutex.ReleaseMutex();
}
}
return value;
}
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
mutex.WaitOne();
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
}
finally
{
mutex.ReleaseMutex();
}
}
return value;
}
}
这对 GetOrCreate()
非常有效,但 GetOrCreateAsync()
会引发异常。事实证明,互斥量是线程绑定的,因此如果 WaitOne()
和 ReleaseMutex()
在不同的线程上调用(async/await 往往会发生),互斥量不喜欢这样并抛出异常。我发现 this other SO question 描述了一些解决方法,因此决定使用自定义任务计划程序。 SingleThreadedTaskScheduler
使用仅包含一个线程的线程池来安排任务。我打算仅从该线程与互斥量进行交互。
单线程任务调度程序
internal sealed class SingleThreadedTaskScheduler : TaskScheduler, IDisposable
{
private readonly Thread _thread;
private BlockingCollection<Task> _tasks;
public SingleThreadedTaskScheduler()
{
_tasks = new BlockingCollection<Task>();
_thread = new Thread(() =>
{
foreach (var t in _tasks.GetConsumingEnumerable())
{
TryExecuteTask(t);
}
});
_thread.IsBackground = true;
_thread.Start();
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
public void Dispose()
{
_tasks?.CompleteAdding();
_thread?.Join();
_tasks?.Dispose();
_tasks = null;
}
}
GetOrCreateAsync 与 SingleThreadedTaskScheduler
private static readonly TaskScheduler _mutexTaskScheduler = new SingleThreadedTaskScheduler();
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
await Task.Factory
.StartNew(() => mutex.WaitOne(), CancellationToken.None, TaskCreationOptions.None, _mutexTaskScheduler)
.ConfiureAwait(false);
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
}
finally
{
await Task.Factory
.StartNew(() => mutex.ReleaseMutex(), CancellationToken.None, TaskCreationOptions.None, _mutexTaskScheduler)
.ConfiureAwait(false);
}
}
return value;
}
通过此实现,异常得到解决,但GetOrCreateAsync
在缓存踩踏场景中仍然多次调用工厂函数。我错过了什么吗?
我也尝试过使用 SemaphoreSlim
而不是 Mutex
,这应该与 async/await 一起玩得更好。这里的问题是 Linux 不支持命名信号量,所以我必须将所有信号量保存在一个 Dictionary<string, SemaphoreSlim>
中,这样管理起来太麻烦了。
linked solution 仅在使用命名互斥锁同步异步代码跨进程 时有效。在 same 进程中同步代码是行不通的。互斥锁允许递归获取,因此通过在同一个线程上移动所有获取,就好像互斥锁根本不存在一样。
I'd have to keep all my semaphores in a Dictionary<string, SemaphoreSlim> and that would be too cumbersome to manage.
如果您需要一个非递归的命名互斥体,名为 Semaphore
s(不适用于 Linux)或管理您自己的字典确实是唯一的方法。
我有一个 AsyncCache<T>
我一直在努力,但还没有准备好。它试图看起来像Task<T>
个实例的缓存,但实际上是TaskCompletionSource<T>
个实例的缓存。
使用信号量似乎有效。感谢 Stephen Cleary 确认这是一条比 Mutexes 更好的路线。
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
WaitOne(key);
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
ReleaseAll(key);
}
catch (Exception)
{
ReleaseOne(key);
throw;
}
}
return value;
}
private static readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphores = new ConcurrentDictionary<string, SemaphoreSlim>();
private static void WaitOne(string key)
{
var semaphore = _semaphores.GetOrAdd(key, k => new SemaphoreSlim(1, int.MaxValue));
semaphore.Wait();
}
private static void ReleaseOne(string key)
{
var semaphore = _semaphores.GetOrAdd(key, k => new SemaphoreSlim(0, int.MaxValue));
semaphore.Release();
}
private static void ReleaseAll(string key)
{
var semaphore = default(SemaphoreSlim);
_semaphores.Remove(key, out semaphore);
semaphore?.Release(int.MaxValue);
semaphore?.Dispose();
}
我正在为我的 ASP.NET Core 3.1 Web API.
实现一个缓存层开始实施
public interface ICache
{
T Get<T>(string key);
void Set<T>(string key, T value);
}
public static class ICacheExtensions
{
public static T GetOrCreate<T>(this ICache cache, string key, Func<T> factory)
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = factory();
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
return value;
}
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
return value;
}
}
这工作正常,但我试图解决的一个已知问题是它容易受到缓存踩踏的影响。如果我的 API 正在处理许多请求,这些请求都试图同时使用 GetOrCreate 方法之一访问相同的键,那么它们将每个 运行 一个工厂函数的并行实例。这意味着冗余工作和资源浪费。
我试图做的是引入互斥锁以确保每个缓存键只有一个工厂函数实例可以 运行。
引入互斥体
public interface ICache
{
T Get<T>(string key);
void Set<T>(string key, T value);
}
public static class ICacheExtensions
{
public static T GetOrCreate<T>(this ICache cache, string key, Func<T> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
mutex.WaitOne();
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = factory();
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
}
finally
{
mutex.ReleaseMutex();
}
}
return value;
}
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
mutex.WaitOne();
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
}
finally
{
mutex.ReleaseMutex();
}
}
return value;
}
}
这对 GetOrCreate()
非常有效,但 GetOrCreateAsync()
会引发异常。事实证明,互斥量是线程绑定的,因此如果 WaitOne()
和 ReleaseMutex()
在不同的线程上调用(async/await 往往会发生),互斥量不喜欢这样并抛出异常。我发现 this other SO question 描述了一些解决方法,因此决定使用自定义任务计划程序。 SingleThreadedTaskScheduler
使用仅包含一个线程的线程池来安排任务。我打算仅从该线程与互斥量进行交互。
单线程任务调度程序
internal sealed class SingleThreadedTaskScheduler : TaskScheduler, IDisposable
{
private readonly Thread _thread;
private BlockingCollection<Task> _tasks;
public SingleThreadedTaskScheduler()
{
_tasks = new BlockingCollection<Task>();
_thread = new Thread(() =>
{
foreach (var t in _tasks.GetConsumingEnumerable())
{
TryExecuteTask(t);
}
});
_thread.IsBackground = true;
_thread.Start();
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
public void Dispose()
{
_tasks?.CompleteAdding();
_thread?.Join();
_tasks?.Dispose();
_tasks = null;
}
}
GetOrCreateAsync 与 SingleThreadedTaskScheduler
private static readonly TaskScheduler _mutexTaskScheduler = new SingleThreadedTaskScheduler();
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
await Task.Factory
.StartNew(() => mutex.WaitOne(), CancellationToken.None, TaskCreationOptions.None, _mutexTaskScheduler)
.ConfiureAwait(false);
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
}
finally
{
await Task.Factory
.StartNew(() => mutex.ReleaseMutex(), CancellationToken.None, TaskCreationOptions.None, _mutexTaskScheduler)
.ConfiureAwait(false);
}
}
return value;
}
通过此实现,异常得到解决,但GetOrCreateAsync
在缓存踩踏场景中仍然多次调用工厂函数。我错过了什么吗?
我也尝试过使用 SemaphoreSlim
而不是 Mutex
,这应该与 async/await 一起玩得更好。这里的问题是 Linux 不支持命名信号量,所以我必须将所有信号量保存在一个 Dictionary<string, SemaphoreSlim>
中,这样管理起来太麻烦了。
linked solution 仅在使用命名互斥锁同步异步代码跨进程 时有效。在 same 进程中同步代码是行不通的。互斥锁允许递归获取,因此通过在同一个线程上移动所有获取,就好像互斥锁根本不存在一样。
I'd have to keep all my semaphores in a Dictionary<string, SemaphoreSlim> and that would be too cumbersome to manage.
如果您需要一个非递归的命名互斥体,名为 Semaphore
s(不适用于 Linux)或管理您自己的字典确实是唯一的方法。
我有一个 AsyncCache<T>
我一直在努力,但还没有准备好。它试图看起来像Task<T>
个实例的缓存,但实际上是TaskCompletionSource<T>
个实例的缓存。
使用信号量似乎有效。感谢 Stephen Cleary 确认这是一条比 Mutexes 更好的路线。
public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
using var mutex = new Mutex(false, key);
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
WaitOne(key);
try
{
var value = cache.Get<T>(key);
if (EqualityComparer<T>.Default.Equals(value, default(T)))
{
value = await factory().ConfigureAwait(false);
if (!EqualityComparer<T>.Default.Equals(value, default(T)))
{
cache.Set(key, value);
}
}
ReleaseAll(key);
}
catch (Exception)
{
ReleaseOne(key);
throw;
}
}
return value;
}
private static readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphores = new ConcurrentDictionary<string, SemaphoreSlim>();
private static void WaitOne(string key)
{
var semaphore = _semaphores.GetOrAdd(key, k => new SemaphoreSlim(1, int.MaxValue));
semaphore.Wait();
}
private static void ReleaseOne(string key)
{
var semaphore = _semaphores.GetOrAdd(key, k => new SemaphoreSlim(0, int.MaxValue));
semaphore.Release();
}
private static void ReleaseAll(string key)
{
var semaphore = default(SemaphoreSlim);
_semaphores.Remove(key, out semaphore);
semaphore?.Release(int.MaxValue);
semaphore?.Dispose();
}