在异步任务期间释放线程
Releasing threads during async tasks
我有一个系统生成大量子进程,这些子进程必须 运行 并行
- 请求的主线程将生成子进程并等待它们完成。
- 那些子进程做一些处理
- 然后与远程通话API
- 然后对 API
的结果进行更多处理
- 然后主线程在所有子进程完成(或超时)后继续
我们在线程数方面遇到了问题,因此我们希望通过在等待远程 API.
时尝试释放线程来减少活动线程的数量
最初我们使用 WebRequest.GetResponse()
进行 API 调用,自然会在等待 API.
时保持空闲线程
我们开始使用 EAP 模型(基于事件的异步编程......所有使用 IAsyncResult 的各种 .NET 方法),我们调用 BeginGetResponse(CallbackTrigger)
,返回 WaitHandle
s到主线程,然后触发 post-API 处理。
按照我们的理解,这意味着子进程线程终止,Callback由网卡级中断触发,触发新线程发起回调。也就是说,在我们等待 API 调用时,没有线程等待 运行 CallbackTrigger
。
如果大家能证实这个认识就好了?
我们现在正在考虑转向 TPL 模型(任务并行库...Task<T>
),使用 WebRequest.GetResponseAsync()
是 await
可行的。我的印象是,这是 await
\ async
所做的一部分...... await
在远程源等待时将控制传递回调用堆栈,如果我启动一堆 await
able Tasks
,然后调用 Tasks
.WaitAll
,这样就不会为每个任务占用一个线程 ,而该任务正在远程等待 API.
我理解正确了吗?
If people could confirm this understanding that would be good?
是的。请注意,IAsyncResult
/Begin*
/End*
模式是 APM,而不是 EAP。 EAP 将是 WebClient
的方法,其中 DownloadAsync
方法在完成时触发 DownloadCompleted
事件。
APM/EAP 是 hard 进行异步工作的方法,但实际上是异步的(意思是,它们不会占用线程只是为了阻塞 I/O 完成)。它们 "hard" 是因为它们使您的代码变得更加复杂 - 以至于大多数开发人员从未使用过它们而只是坚持使用同步代码。
Have I correctly understood this?
是的。通常,.NET 中的所有异步 I/O 都是使用作为线程池的一部分存在的单个 I/O 完成端口实现的。无论 API 是 APM、EAP 还是 TAP,都是如此。
使用 TAP 的 async
/await
的整个想法是核心 Task
s(就像从 GetResponseAsync
返回的那些)仍然建立在相同的异步之上I/O 系统,然后 async
/await
使消费它们更加愉快;您可以使用 await
的相同方法,而不是搞乱回调 (APM) 或事件处理程序 (EAP)。
作为一个有趣的旁注,Task
实际上实现了 IAsyncResult
,从高层次的角度来看,APM 和 TAP 非常相似(IAsyncResult
和 Task
表示一个操作 "in flight").
您应该会发现您的 TAP 代码比当前的 APM/EAP 代码简单得多(并且更易于维护!),并且性能没有明显变化。
(附带说明一下,考虑转移到 HttpClient
,它是从头开始设计的,考虑到了 TAP,而不是 HttpWebRequest
/WebClient
,它有 TAP固定在他们身上)。
然而...
I have a system that spawns a LOT of sub-processes that must run in parallel...
对于这种 "pipeline",您可能需要考虑转换为 TPL 数据流。 Dataflow 了解同步和异步 (TAP) 工作,并且内置了对节流的支持。数据流方法可以比 TAP 本身更进一步地简化您的代码。
根据@Stephen Cleary 的回答,我设置了一个简短的测试来进一步证明这一点。
下面的代码,当 运行 同步方法时,不修改 SetMinThreads,并且当目标网站需要几秒钟到 return 时,将为每个请求打开一个线程.它将显示越来越多的活动线程,它会立即启动前几个任务,但随后 "choke up" 当它达到 ThreadPool
的限制并且只允许每半秒启动一次新线程,或者当旧请求结束。
设置更高的 MinThreadCount 会按预期延迟问题。
保持 MinThread 计数未设置,但切换到异步 (APM) 方法或等待 (TAP) 方法会导致所有任务立即启动,并且任何时候活动的线程数都保持在较低水平。
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace LockTraceParser
{
internal class AsyncThreadsTester
{
public void Run()
{
//ThreadPool.SetMinThreads(100, 100);
Console.WriteLine("Beginning Test: ");
LogThreadCounts();
Test();
}
private void Test()
{
LogThreadCounts();
for (int i = 0; i < 65; i++)
{
//StartParallelUserWorkItem(i);
StartTask(i);
Thread.Sleep(100); //sleep a while so that the other thread is working
LogThreadCounts();
}
for (int i = 0; i < 40; i++)
{
Thread.Sleep(1100); //sleep a while so that the other thread is working
LogThreadCounts();
}
}
private void StartTask(int label)
{
var taskLabel = "Task " + label;
Console.WriteLine("Enqueue " + taskLabel);
Task.Run(() => GetResponseAwait(taskLabel));
}
private static void LogThreadCounts()
{
int worker;
int io;
ThreadPool.GetAvailableThreads(out worker, out io);
Console.WriteLine("Worker Threads Available:" + '\t' + worker + '\t' + "IO Threads Available:" + '\t' + io + '\t' +
"Threads held by Process: " + '\t' + Process.GetCurrentProcess().Threads.Count);
}
private void GetResponseSync(object label)
{
Console.WriteLine("Start Sync " + label);
try
{
var req = GetRequest();
using (var resp = req.GetResponse())
{
Console.WriteLine(resp.ContentLength);
}
}
catch (Exception e)
{
Console.WriteLine("Error response " + label);
}
Console.WriteLine("End response " + label);
}
private void BeginResponseAsync(object label)
{
Console.WriteLine("Start Async " + label);
try
{
var req = GetRequest();
req.BeginGetResponse(EndGetResponseAsync, req);
}
catch (Exception e)
{
Console.WriteLine("Error Async " + label);
}
}
private void EndGetResponseAsync(IAsyncResult result)
{
Console.WriteLine("Respond Async ");
var req = (WebRequest)result.AsyncState;
using (var resp = req.EndGetResponse(result))
{
Console.WriteLine(resp.ContentLength);
}
Console.WriteLine("End Async ");
}
private async Task GetResponseAwait(object label)
{
Console.WriteLine("Start Await " + label);
try
{
var req = GetRequest();
using (var resp = await req.GetResponseAsync())
{
Console.WriteLine(resp.ContentLength);
}
}
catch (Exception e)
{
Console.WriteLine("Error Await " + label);
}
Console.WriteLine("End Await " + label);
}
private WebRequest GetRequest()
{
var req = WebRequest.Create("http://aslowwebsite.com");
req.Timeout = (int)TimeSpan.FromSeconds(60).TotalMilliseconds;
return req;
}
}
}
我有一个系统生成大量子进程,这些子进程必须 运行 并行
- 请求的主线程将生成子进程并等待它们完成。
- 那些子进程做一些处理
- 然后与远程通话API
- 然后对 API 的结果进行更多处理
- 然后主线程在所有子进程完成(或超时)后继续
我们在线程数方面遇到了问题,因此我们希望通过在等待远程 API.
时尝试释放线程来减少活动线程的数量最初我们使用 WebRequest.GetResponse()
进行 API 调用,自然会在等待 API.
我们开始使用 EAP 模型(基于事件的异步编程......所有使用 IAsyncResult 的各种 .NET 方法),我们调用 BeginGetResponse(CallbackTrigger)
,返回 WaitHandle
s到主线程,然后触发 post-API 处理。
按照我们的理解,这意味着子进程线程终止,Callback由网卡级中断触发,触发新线程发起回调。也就是说,在我们等待 API 调用时,没有线程等待 运行 CallbackTrigger
。
如果大家能证实这个认识就好了?
我们现在正在考虑转向 TPL 模型(任务并行库...Task<T>
),使用 WebRequest.GetResponseAsync()
是 await
可行的。我的印象是,这是 await
\ async
所做的一部分...... await
在远程源等待时将控制传递回调用堆栈,如果我启动一堆 await
able Tasks
,然后调用 Tasks
.WaitAll
,这样就不会为每个任务占用一个线程 ,而该任务正在远程等待 API.
我理解正确了吗?
If people could confirm this understanding that would be good?
是的。请注意,IAsyncResult
/Begin*
/End*
模式是 APM,而不是 EAP。 EAP 将是 WebClient
的方法,其中 DownloadAsync
方法在完成时触发 DownloadCompleted
事件。
APM/EAP 是 hard 进行异步工作的方法,但实际上是异步的(意思是,它们不会占用线程只是为了阻塞 I/O 完成)。它们 "hard" 是因为它们使您的代码变得更加复杂 - 以至于大多数开发人员从未使用过它们而只是坚持使用同步代码。
Have I correctly understood this?
是的。通常,.NET 中的所有异步 I/O 都是使用作为线程池的一部分存在的单个 I/O 完成端口实现的。无论 API 是 APM、EAP 还是 TAP,都是如此。
使用 TAP 的 async
/await
的整个想法是核心 Task
s(就像从 GetResponseAsync
返回的那些)仍然建立在相同的异步之上I/O 系统,然后 async
/await
使消费它们更加愉快;您可以使用 await
的相同方法,而不是搞乱回调 (APM) 或事件处理程序 (EAP)。
作为一个有趣的旁注,Task
实际上实现了 IAsyncResult
,从高层次的角度来看,APM 和 TAP 非常相似(IAsyncResult
和 Task
表示一个操作 "in flight").
您应该会发现您的 TAP 代码比当前的 APM/EAP 代码简单得多(并且更易于维护!),并且性能没有明显变化。
(附带说明一下,考虑转移到 HttpClient
,它是从头开始设计的,考虑到了 TAP,而不是 HttpWebRequest
/WebClient
,它有 TAP固定在他们身上)。
然而...
I have a system that spawns a LOT of sub-processes that must run in parallel...
对于这种 "pipeline",您可能需要考虑转换为 TPL 数据流。 Dataflow 了解同步和异步 (TAP) 工作,并且内置了对节流的支持。数据流方法可以比 TAP 本身更进一步地简化您的代码。
根据@Stephen Cleary 的回答,我设置了一个简短的测试来进一步证明这一点。
下面的代码,当 运行 同步方法时,不修改 SetMinThreads,并且当目标网站需要几秒钟到 return 时,将为每个请求打开一个线程.它将显示越来越多的活动线程,它会立即启动前几个任务,但随后 "choke up" 当它达到 ThreadPool
的限制并且只允许每半秒启动一次新线程,或者当旧请求结束。
设置更高的 MinThreadCount 会按预期延迟问题。
保持 MinThread 计数未设置,但切换到异步 (APM) 方法或等待 (TAP) 方法会导致所有任务立即启动,并且任何时候活动的线程数都保持在较低水平。
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace LockTraceParser
{
internal class AsyncThreadsTester
{
public void Run()
{
//ThreadPool.SetMinThreads(100, 100);
Console.WriteLine("Beginning Test: ");
LogThreadCounts();
Test();
}
private void Test()
{
LogThreadCounts();
for (int i = 0; i < 65; i++)
{
//StartParallelUserWorkItem(i);
StartTask(i);
Thread.Sleep(100); //sleep a while so that the other thread is working
LogThreadCounts();
}
for (int i = 0; i < 40; i++)
{
Thread.Sleep(1100); //sleep a while so that the other thread is working
LogThreadCounts();
}
}
private void StartTask(int label)
{
var taskLabel = "Task " + label;
Console.WriteLine("Enqueue " + taskLabel);
Task.Run(() => GetResponseAwait(taskLabel));
}
private static void LogThreadCounts()
{
int worker;
int io;
ThreadPool.GetAvailableThreads(out worker, out io);
Console.WriteLine("Worker Threads Available:" + '\t' + worker + '\t' + "IO Threads Available:" + '\t' + io + '\t' +
"Threads held by Process: " + '\t' + Process.GetCurrentProcess().Threads.Count);
}
private void GetResponseSync(object label)
{
Console.WriteLine("Start Sync " + label);
try
{
var req = GetRequest();
using (var resp = req.GetResponse())
{
Console.WriteLine(resp.ContentLength);
}
}
catch (Exception e)
{
Console.WriteLine("Error response " + label);
}
Console.WriteLine("End response " + label);
}
private void BeginResponseAsync(object label)
{
Console.WriteLine("Start Async " + label);
try
{
var req = GetRequest();
req.BeginGetResponse(EndGetResponseAsync, req);
}
catch (Exception e)
{
Console.WriteLine("Error Async " + label);
}
}
private void EndGetResponseAsync(IAsyncResult result)
{
Console.WriteLine("Respond Async ");
var req = (WebRequest)result.AsyncState;
using (var resp = req.EndGetResponse(result))
{
Console.WriteLine(resp.ContentLength);
}
Console.WriteLine("End Async ");
}
private async Task GetResponseAwait(object label)
{
Console.WriteLine("Start Await " + label);
try
{
var req = GetRequest();
using (var resp = await req.GetResponseAsync())
{
Console.WriteLine(resp.ContentLength);
}
}
catch (Exception e)
{
Console.WriteLine("Error Await " + label);
}
Console.WriteLine("End Await " + label);
}
private WebRequest GetRequest()
{
var req = WebRequest.Create("http://aslowwebsite.com");
req.Timeout = (int)TimeSpan.FromSeconds(60).TotalMilliseconds;
return req;
}
}
}