用于单线程异步异步等待式编程的信号量
Semaphores for single-threaded asynchronous async-await-style programming
信号量是一种多线程锁定机制,可确保在给定资源上只有有限数量的线程 运行。互斥锁是一种特殊情况,其中有限数量是一个。
异步编程与多线程编程有很多共同点(有时与之相关),尽管它本身并不是多线程的。
以下代码创建十个任务,它们只需等待一秒钟并记录它们的开始和结束。
所有这些都只在一个线程上执行(我假设适当的同步上下文维护就位,例如在 WPF 中就是这种情况)。
因此,即使只有一个线程,我们也有 "parallel" 个任务,并且在某些用例中,人们希望将对资源的访问限制为仅少数或其中一项任务。 (例如,限制并行网络请求。)
看来需要 "async semaphore" - 一个锁定异步延续而非线程的概念。
我已经实现了这样一个信号量来检查它是否真的有意义并阐明我的意思。
我的问题是:这个东西是否已经可用,最好是在 .NET 框架本身中?我找不到任何东西,虽然在我看来它应该存在。
所以这是代码 (LINQPad share here):
async void Main()
{
// Necessary in LINQPad to ensure a single thread.
// Other environments such as WPF do this for you.
SynchronizationContext.SetSynchronizationContext(
new DispatcherSynchronizationContext());
var tasks = Enumerable.Range(1, 10).Select(SampleWork).ToArray();
await Task.WhenAll(tasks);
"All done.".Dump();
}
AsyncSemaphore commonSemaphore = new AsyncSemaphore(4);
async Task SampleWork(Int32 i)
{
using (await commonSemaphore.Acquire())
{
$"Beginning work #{i} {Thread.CurrentThread.ManagedThreadId}".Dump();
await Task.Delay(TimeSpan.FromSeconds(1));
$"Finished work #{i} {Thread.CurrentThread.ManagedThreadId}".Dump();
}
}
public class AsyncSemaphore
{
Int32 maxTasks;
Int32 currentTasks;
ReleasingDisposable release;
Queue<TaskCompletionSource<Object>> continuations
= new Queue<TaskCompletionSource<Object>>();
public AsyncSemaphore(Int32 maxTasks = 1)
{
this.maxTasks = maxTasks;
release = new ReleasingDisposable(this);
}
public async Task<IDisposable> Acquire()
{
++currentTasks;
if (currentTasks > maxTasks)
{
var tcs = new TaskCompletionSource<Object>();
continuations.Enqueue(tcs);
await tcs.Task;
}
return release;
}
void Release()
{
--currentTasks;
if (continuations.Count > 0)
{
var tcs = continuations.Dequeue();
tcs.SetResult(null);
}
}
class ReleasingDisposable : IDisposable
{
AsyncSemaphore self;
public ReleasingDisposable(AsyncSemaphore self) => this.self = self;
public void Dispose() => self.Release();
}
}
我得到这个输出:
Beginning work #1 1
Beginning work #2 1
Beginning work #3 1
Beginning work #4 1
Finished work #4 1
Finished work #3 1
Finished work #2 1
Finished work #1 1
Beginning work #5 1
Beginning work #6 1
Beginning work #7 1
Beginning work #8 1
Finished work #5 1
Beginning work #9 1
Finished work #8 1
Finished work #7 1
Finished work #6 1
Beginning work #10 1
Finished work #9 1
Finished work #10 1
All done.
所以确实,我最多有 4 个任务 运行,并且都在同一个线程上 运行。
So even though there's only one thread we have "parallel" tasks
我通常更喜欢术语 "concurrent" 只是为了避免与 Parallel
/ Parallel LINQ 混淆。
My question would be: Is this thing available already, ideally in the .NET framework itself?
是的。 SemaphoreSlim
是一个信号量,可以同步使用 或 异步使用。
我也有一个full suite of asynchronous coordination primitives on NuGet, inspired by Stephen Toub's blog post series on the subject。我的原语都是同步和异步兼容的(和线程安全的),这在例如资源的一个用户是同步的而其他用户是异步的情况下很有用。
信号量是一种多线程锁定机制,可确保在给定资源上只有有限数量的线程 运行。互斥锁是一种特殊情况,其中有限数量是一个。
异步编程与多线程编程有很多共同点(有时与之相关),尽管它本身并不是多线程的。
以下代码创建十个任务,它们只需等待一秒钟并记录它们的开始和结束。
所有这些都只在一个线程上执行(我假设适当的同步上下文维护就位,例如在 WPF 中就是这种情况)。
因此,即使只有一个线程,我们也有 "parallel" 个任务,并且在某些用例中,人们希望将对资源的访问限制为仅少数或其中一项任务。 (例如,限制并行网络请求。)
看来需要 "async semaphore" - 一个锁定异步延续而非线程的概念。
我已经实现了这样一个信号量来检查它是否真的有意义并阐明我的意思。
我的问题是:这个东西是否已经可用,最好是在 .NET 框架本身中?我找不到任何东西,虽然在我看来它应该存在。
所以这是代码 (LINQPad share here):
async void Main()
{
// Necessary in LINQPad to ensure a single thread.
// Other environments such as WPF do this for you.
SynchronizationContext.SetSynchronizationContext(
new DispatcherSynchronizationContext());
var tasks = Enumerable.Range(1, 10).Select(SampleWork).ToArray();
await Task.WhenAll(tasks);
"All done.".Dump();
}
AsyncSemaphore commonSemaphore = new AsyncSemaphore(4);
async Task SampleWork(Int32 i)
{
using (await commonSemaphore.Acquire())
{
$"Beginning work #{i} {Thread.CurrentThread.ManagedThreadId}".Dump();
await Task.Delay(TimeSpan.FromSeconds(1));
$"Finished work #{i} {Thread.CurrentThread.ManagedThreadId}".Dump();
}
}
public class AsyncSemaphore
{
Int32 maxTasks;
Int32 currentTasks;
ReleasingDisposable release;
Queue<TaskCompletionSource<Object>> continuations
= new Queue<TaskCompletionSource<Object>>();
public AsyncSemaphore(Int32 maxTasks = 1)
{
this.maxTasks = maxTasks;
release = new ReleasingDisposable(this);
}
public async Task<IDisposable> Acquire()
{
++currentTasks;
if (currentTasks > maxTasks)
{
var tcs = new TaskCompletionSource<Object>();
continuations.Enqueue(tcs);
await tcs.Task;
}
return release;
}
void Release()
{
--currentTasks;
if (continuations.Count > 0)
{
var tcs = continuations.Dequeue();
tcs.SetResult(null);
}
}
class ReleasingDisposable : IDisposable
{
AsyncSemaphore self;
public ReleasingDisposable(AsyncSemaphore self) => this.self = self;
public void Dispose() => self.Release();
}
}
我得到这个输出:
Beginning work #1 1
Beginning work #2 1
Beginning work #3 1
Beginning work #4 1
Finished work #4 1
Finished work #3 1
Finished work #2 1
Finished work #1 1
Beginning work #5 1
Beginning work #6 1
Beginning work #7 1
Beginning work #8 1
Finished work #5 1
Beginning work #9 1
Finished work #8 1
Finished work #7 1
Finished work #6 1
Beginning work #10 1
Finished work #9 1
Finished work #10 1
All done.
所以确实,我最多有 4 个任务 运行,并且都在同一个线程上 运行。
So even though there's only one thread we have "parallel" tasks
我通常更喜欢术语 "concurrent" 只是为了避免与 Parallel
/ Parallel LINQ 混淆。
My question would be: Is this thing available already, ideally in the .NET framework itself?
是的。 SemaphoreSlim
是一个信号量,可以同步使用 或 异步使用。
我也有一个full suite of asynchronous coordination primitives on NuGet, inspired by Stephen Toub's blog post series on the subject。我的原语都是同步和异步兼容的(和线程安全的),这在例如资源的一个用户是同步的而其他用户是异步的情况下很有用。