Cancellable SemaphoreSlim - 你会建议对我的封装进行任何改进吗?
Cancellable SemaphoreSlim - Would you suggest any amelioration for my encapsulation?
出于 X 的原因,我需要编写一个信号量封装,以允许我取消 SemaphoreSlim
上所有等待的进程。 (SemaphoreSlim注销封装)
有我的class:
public class CancellableSemaphoreSlim
{
readonly Queue<CancellationTokenSource> tokens = new Queue<CancellationTokenSource>();
readonly SemaphoreSlim ss;
/// <summary>
/// Initializes a new instance of the <see cref="T:Eyes.Mobile.Core.Helpers.CancellableSemaphoreSlim"/> class.
/// </summary>
/// <param name="initialCount">Initial count.</param>
public CancellableSemaphoreSlim(int initialCount) { ss = new SemaphoreSlim(initialCount); }
/// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationToken" />. </summary>
/// <returns>A task that will complete when the semaphore has been entered. </returns>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
/// <exception cref="T:System.OperationCanceledException" />
public Task WaitAsync()
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
tokens.Enqueue(cancellationTokenSource);
return ss.WaitAsync(cancellationTokenSource.Token);
}
/// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationTokenSource" />. </summary>
/// <returns>A task that will complete when the semaphore has been entered. </returns>
/// <param name="cancellationTokenSource">The <see cref="T:System.Threading.CancellationToken" /> token to observe.</param>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
/// <exception cref="T:System.OperationCanceledException">
/// <paramref name="cancellationTokenSource" /> was canceled.
/// </exception>
public Task WaitAsync(CancellationToken cancellationToken)
{
CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
tokens.Enqueue(cancellationTokenSource);
return ss.WaitAsync(cancellationTokenSource.Token);
}
/// <summary>
/// Release this instance.
/// </summary>
/// <returns>The released semaphore return.</returns>
public int Release() => ss.Release();
/// <summary>
/// Cancel all processus currently in WaitAsync() state.
/// </summary>
public void CancelAll()
{
while (tokens.Count > 0)
{
CancellationTokenSource token = tokens.Dequeue();
if (!token.IsCancellationRequested)
token.Cancel();
}
}
}
你可以像基本一样使用它SemaphoreSlim
,我写了一个简单的示例:
class Program
{
static void Main(string[] args)
{
AsyncContext.Run(() => MainAsync(args));
}
static async void MainAsync(string[] args)
{
for (int i = 0; i < 5; i++)
{
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(10000);
await Task.WhenAll(
MakeAnAction(i, cancellationTokenSource),
MakeAnAction(i, cancellationTokenSource),
MakeAnAction(i, cancellationTokenSource),
MakeAnAction(i, cancellationTokenSource),
MakeAnAction(i, cancellationTokenSource)
);
}
catch (OperationCanceledException) { }
}
await Task.Delay(5000);
cancellableSemaphoreSlim.CancelAll();
await Task.Delay(5000);
}
readonly static CancellableSemaphoreSlim cancellableSemaphoreSlim = new CancellableSemaphoreSlim(1);
readonly static Random rnd = new Random();
internal static async Task MakeAnAction(int id, CancellationTokenSource cancellationTokenSource)
{
try
{
await cancellableSemaphoreSlim.WaitAsync(cancellationTokenSource.Token);
int actionTime = rnd.Next(2, 10) * 1000;
Output($"{id} : Start ({actionTime})");
await Task.Delay(actionTime, cancellationTokenSource.Token);
Output($"{id} : OK ({actionTime})");
}
catch (OperationCanceledException)
{
Output($"{id} : Cancelled");
}
finally
{
cancellableSemaphoreSlim.Release();
}
}
private static void Output(string str)
{
Debug.WriteLine(str);
Console.WriteLine(str);
}
}
但是,我想知道使用 Queue<CancellationTokenSource>
是否会产生任何异步问题?因为,如果我们有一个可以被不同的 Threads/Tasks 调用的方法(类似 makeAnAction),如果 CancelAll() 在新的 Task/Thread 调用 makeAnAction 之前被调用,这意味着这个将被添加到队列中,实际上它的所有项目都已出队..
我曾考虑尝试使用 CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)
在我所有的取消标记之间创建一个唯一的 link。然而,即使它是可变参数逻辑 (params
),它会产生同样的问题吗?
我只是想以不会失败的方式实现它,但我想我目前的方法很糟糕,所以我只是想知道是否有人可以给我一个观点对此封装及其逻辑有何看法?
如果您认为某些内容不符合逻辑,请随时给我任何建议 :)
最大
编辑 1
然后我编辑了代码以跟进与@NthDeveloper 的讨论。我尝试添加 lock system
public class CancellableSemaphoreSlim
{
object _syncObj = new object();
readonly Queue<CancellationTokenSource> tokens = new Queue<CancellationTokenSource>();
readonly SemaphoreSlim ss;
/// <summary>
/// Initializes a new instance of the <see cref="T:Eyes.Mobile.Core.Helpers.CancellableSemaphoreSlim"/> class.
/// </summary>
/// <param name="initialCount">Initial count.</param>
public CancellableSemaphoreSlim(int initialCount) { ss = new SemaphoreSlim(initialCount); }
/// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationToken" />. </summary>
/// <returns>A task that will complete when the semaphore has been entered. </returns>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
/// <exception cref="T:System.OperationCanceledException" />
public Task WaitAsync()
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
lock (_syncObj)
{
tokens.Enqueue(cancellationTokenSource);
}
return ss.WaitAsync(cancellationTokenSource.Token);
}
/// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationTokenSource" />. </summary>
/// <returns>A task that will complete when the semaphore has been entered. </returns>
/// <param name="cancellationTokenSource">The <see cref="T:System.Threading.CancellationToken" /> token to observe.</param>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
/// <exception cref="T:System.OperationCanceledException">
/// <paramref name="cancellationTokenSource" /> was canceled.
/// </exception>
public Task WaitAsync(CancellationToken cancellationToken)
{
CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
lock (_syncObj)
{
tokens.Enqueue(cancellationTokenSource);
}
return ss.WaitAsync(cancellationTokenSource.Token);
}
/// <summary>
/// Release this instance.
/// </summary>
/// <returns>The released semaphore return.</returns>
public int Release() => ss.Release();
/// <summary>
/// Cancel all processus currently in WaitAsync() state.
/// </summary>
public void CancelAll()
{
lock (_syncObj)
{
while (tokens.Count > 0)
{
CancellationTokenSource token = tokens.Dequeue();
if (!token.IsCancellationRequested)
token.Cancel();
}
}
}
}
示例线程安全 class 保护其内部列表免受同时更改,并防止 class 在处理后使用。
public class SampleThreadSafeDisposableClass: IDisposable
{
bool _isDisposed;
object _syncObj = new object();
List<object> _list = new List<object>();
public void Add(object obj)
{
lock(_syncObj)
{
if (_isDisposed)
return;
_list.Add(obj);
}
}
//This method can be Dispose/Clear/CancelAll
public void Dispose()
{
lock (_syncObj)
{
if (_isDisposed)
return;
_isDisposed = true;
_list.Clear();
}
}
}
希望对您有所帮助。
我认为你可以通过只使用一个 CancellationSource
来简化代码,它在 CancelAll
:
中被触发并与一个新的交换
public sealed class CancellableSemaphoreSlim : IDisposable
{
private CancellationTokenSource cancelSource = new();
private readonly SemaphoreSlim semaphoreSlim;
public CancellableSemaphoreSlim(int initialCount) => semaphoreSlim = new SemaphoreSlim(initialCount);
public Task WaitAsync() => semaphoreSlim.WaitAsync(cancelSource.Token);
public async Task WaitAsync(CancellationToken cancellationToken)
{
// This operation will cancel when either the user token or our cancelSource signal cancellation
using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancelSource.Token, cancellationToken);
await semaphoreSlim.WaitAsync(linkedSource.Token);
}
public int Release() => semaphoreSlim.Release();
public void CancelAll()
{
using var currentCancelSource = Interlocked.Exchange(ref cancelSource, new CancellationTokenSource());
currentCancelSource.Cancel();
}
public void Dispose()
{
cancelSource.Dispose();
semaphoreSlim.Dispose();
}
}
总会有一场竞赛来确定 WaitAsync
是否被同时调用 CancelAll
运行 取消。
在这个版本中,它只取决于在 WaitAsync()
中是旧的还是新的 cancelSource.Token
被抓取。
出于 X 的原因,我需要编写一个信号量封装,以允许我取消 SemaphoreSlim
上所有等待的进程。 (SemaphoreSlim注销封装)
有我的class:
public class CancellableSemaphoreSlim
{
readonly Queue<CancellationTokenSource> tokens = new Queue<CancellationTokenSource>();
readonly SemaphoreSlim ss;
/// <summary>
/// Initializes a new instance of the <see cref="T:Eyes.Mobile.Core.Helpers.CancellableSemaphoreSlim"/> class.
/// </summary>
/// <param name="initialCount">Initial count.</param>
public CancellableSemaphoreSlim(int initialCount) { ss = new SemaphoreSlim(initialCount); }
/// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationToken" />. </summary>
/// <returns>A task that will complete when the semaphore has been entered. </returns>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
/// <exception cref="T:System.OperationCanceledException" />
public Task WaitAsync()
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
tokens.Enqueue(cancellationTokenSource);
return ss.WaitAsync(cancellationTokenSource.Token);
}
/// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationTokenSource" />. </summary>
/// <returns>A task that will complete when the semaphore has been entered. </returns>
/// <param name="cancellationTokenSource">The <see cref="T:System.Threading.CancellationToken" /> token to observe.</param>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
/// <exception cref="T:System.OperationCanceledException">
/// <paramref name="cancellationTokenSource" /> was canceled.
/// </exception>
public Task WaitAsync(CancellationToken cancellationToken)
{
CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
tokens.Enqueue(cancellationTokenSource);
return ss.WaitAsync(cancellationTokenSource.Token);
}
/// <summary>
/// Release this instance.
/// </summary>
/// <returns>The released semaphore return.</returns>
public int Release() => ss.Release();
/// <summary>
/// Cancel all processus currently in WaitAsync() state.
/// </summary>
public void CancelAll()
{
while (tokens.Count > 0)
{
CancellationTokenSource token = tokens.Dequeue();
if (!token.IsCancellationRequested)
token.Cancel();
}
}
}
你可以像基本一样使用它SemaphoreSlim
,我写了一个简单的示例:
class Program
{
static void Main(string[] args)
{
AsyncContext.Run(() => MainAsync(args));
}
static async void MainAsync(string[] args)
{
for (int i = 0; i < 5; i++)
{
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(10000);
await Task.WhenAll(
MakeAnAction(i, cancellationTokenSource),
MakeAnAction(i, cancellationTokenSource),
MakeAnAction(i, cancellationTokenSource),
MakeAnAction(i, cancellationTokenSource),
MakeAnAction(i, cancellationTokenSource)
);
}
catch (OperationCanceledException) { }
}
await Task.Delay(5000);
cancellableSemaphoreSlim.CancelAll();
await Task.Delay(5000);
}
readonly static CancellableSemaphoreSlim cancellableSemaphoreSlim = new CancellableSemaphoreSlim(1);
readonly static Random rnd = new Random();
internal static async Task MakeAnAction(int id, CancellationTokenSource cancellationTokenSource)
{
try
{
await cancellableSemaphoreSlim.WaitAsync(cancellationTokenSource.Token);
int actionTime = rnd.Next(2, 10) * 1000;
Output($"{id} : Start ({actionTime})");
await Task.Delay(actionTime, cancellationTokenSource.Token);
Output($"{id} : OK ({actionTime})");
}
catch (OperationCanceledException)
{
Output($"{id} : Cancelled");
}
finally
{
cancellableSemaphoreSlim.Release();
}
}
private static void Output(string str)
{
Debug.WriteLine(str);
Console.WriteLine(str);
}
}
但是,我想知道使用 Queue<CancellationTokenSource>
是否会产生任何异步问题?因为,如果我们有一个可以被不同的 Threads/Tasks 调用的方法(类似 makeAnAction),如果 CancelAll() 在新的 Task/Thread 调用 makeAnAction 之前被调用,这意味着这个将被添加到队列中,实际上它的所有项目都已出队..
我曾考虑尝试使用 CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)
在我所有的取消标记之间创建一个唯一的 link。然而,即使它是可变参数逻辑 (params
),它会产生同样的问题吗?
我只是想以不会失败的方式实现它,但我想我目前的方法很糟糕,所以我只是想知道是否有人可以给我一个观点对此封装及其逻辑有何看法?
如果您认为某些内容不符合逻辑,请随时给我任何建议 :)
最大
编辑 1
然后我编辑了代码以跟进与@NthDeveloper 的讨论。我尝试添加 lock system
public class CancellableSemaphoreSlim
{
object _syncObj = new object();
readonly Queue<CancellationTokenSource> tokens = new Queue<CancellationTokenSource>();
readonly SemaphoreSlim ss;
/// <summary>
/// Initializes a new instance of the <see cref="T:Eyes.Mobile.Core.Helpers.CancellableSemaphoreSlim"/> class.
/// </summary>
/// <param name="initialCount">Initial count.</param>
public CancellableSemaphoreSlim(int initialCount) { ss = new SemaphoreSlim(initialCount); }
/// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationToken" />. </summary>
/// <returns>A task that will complete when the semaphore has been entered. </returns>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
/// <exception cref="T:System.OperationCanceledException" />
public Task WaitAsync()
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
lock (_syncObj)
{
tokens.Enqueue(cancellationTokenSource);
}
return ss.WaitAsync(cancellationTokenSource.Token);
}
/// <summary>Asynchronously waits to enter the <see cref="T:System.Threading.SemaphoreSlim" />, while observing a <see cref="T:System.Threading.CancellationTokenSource" />. </summary>
/// <returns>A task that will complete when the semaphore has been entered. </returns>
/// <param name="cancellationTokenSource">The <see cref="T:System.Threading.CancellationToken" /> token to observe.</param>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been disposed.</exception>
/// <exception cref="T:System.OperationCanceledException">
/// <paramref name="cancellationTokenSource" /> was canceled.
/// </exception>
public Task WaitAsync(CancellationToken cancellationToken)
{
CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
lock (_syncObj)
{
tokens.Enqueue(cancellationTokenSource);
}
return ss.WaitAsync(cancellationTokenSource.Token);
}
/// <summary>
/// Release this instance.
/// </summary>
/// <returns>The released semaphore return.</returns>
public int Release() => ss.Release();
/// <summary>
/// Cancel all processus currently in WaitAsync() state.
/// </summary>
public void CancelAll()
{
lock (_syncObj)
{
while (tokens.Count > 0)
{
CancellationTokenSource token = tokens.Dequeue();
if (!token.IsCancellationRequested)
token.Cancel();
}
}
}
}
示例线程安全 class 保护其内部列表免受同时更改,并防止 class 在处理后使用。
public class SampleThreadSafeDisposableClass: IDisposable
{
bool _isDisposed;
object _syncObj = new object();
List<object> _list = new List<object>();
public void Add(object obj)
{
lock(_syncObj)
{
if (_isDisposed)
return;
_list.Add(obj);
}
}
//This method can be Dispose/Clear/CancelAll
public void Dispose()
{
lock (_syncObj)
{
if (_isDisposed)
return;
_isDisposed = true;
_list.Clear();
}
}
}
希望对您有所帮助。
我认为你可以通过只使用一个 CancellationSource
来简化代码,它在 CancelAll
:
public sealed class CancellableSemaphoreSlim : IDisposable
{
private CancellationTokenSource cancelSource = new();
private readonly SemaphoreSlim semaphoreSlim;
public CancellableSemaphoreSlim(int initialCount) => semaphoreSlim = new SemaphoreSlim(initialCount);
public Task WaitAsync() => semaphoreSlim.WaitAsync(cancelSource.Token);
public async Task WaitAsync(CancellationToken cancellationToken)
{
// This operation will cancel when either the user token or our cancelSource signal cancellation
using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancelSource.Token, cancellationToken);
await semaphoreSlim.WaitAsync(linkedSource.Token);
}
public int Release() => semaphoreSlim.Release();
public void CancelAll()
{
using var currentCancelSource = Interlocked.Exchange(ref cancelSource, new CancellationTokenSource());
currentCancelSource.Cancel();
}
public void Dispose()
{
cancelSource.Dispose();
semaphoreSlim.Dispose();
}
}
总会有一场竞赛来确定 WaitAsync
是否被同时调用 CancelAll
运行 取消。
在这个版本中,它只取决于在 WaitAsync()
中是旧的还是新的 cancelSource.Token
被抓取。