在异步任务期间释放线程

Releasing threads during async tasks

我有一个系统生成大量子进程,这些子进程必须 运行 并行

我们在线程数方面遇到了问题,因此我们希望通过在等待远程 API.

时尝试释放线程来减少活动线程的数量

最初我们使用 WebRequest.GetResponse() 进行 API 调用,自然会在等待 API.

时保持空闲线程

我们开始使用 EAP 模型(基于事件的异步编程......所有使用 IAsyncResult 的各种 .NET 方法),我们调用 BeginGetResponse(CallbackTrigger),返回 WaitHandles到主线程,然后触发 post-API 处理。

按照我们的理解,这意味着子进程线程终止,Callback由网卡级中断触发,触发新线程发起回调。也就是说,在我们等待 API 调用时,没有线程等待 运行 CallbackTrigger

如果大家能证实这个认识就好了?

我们现在正在考虑转向 TPL 模型(任务并行库...Task<T>),使用 WebRequest.GetResponseAsync()await 可行的。我的印象是,这是 await\ async 所做的一部分...... await 在远程源等待时将控制传递回调用堆栈,如果我启动一堆 awaitable Tasks,然后调用 Tasks.WaitAll,这样就不会为每个任务占用一个线程 ,而该任务正在远程等待 API.

我理解正确了吗?

If people could confirm this understanding that would be good?

是的。请注意,IAsyncResult/Begin*/End* 模式是 APM,而不是 EAP。 EAP 将是 WebClient 的方法,其中 DownloadAsync 方法在完成时触发 DownloadCompleted 事件。

APM/EAP 是 hard 进行异步工作的方法,但实际上是异步的(意思是,它们不会占用线程只是为了阻塞 I/O 完成)。它们 "hard" 是因为它们使您的代码变得更加复杂 - 以至于大多数开发人员从未使用过它们而只是坚持使用同步代码。

Have I correctly understood this?

是的。通常,.NET 中的所有异步 I/O 都是使用作为线程池的一部分存在的单个 I/O 完成端口实现的。无论 API 是 APM、EAP 还是 TAP,都是如此。

使用 TAP 的 async/await 的整个想法是核心 Tasks(就像从 GetResponseAsync 返回的那些)仍然建立在相同的异步之上I/O 系统,然后 async/await 使消费它们更加愉快;您可以使用 await 的相同方法,而不是搞乱回调 (APM) 或事件处理程序 (EAP)。

作为一个有趣的旁注,Task 实际上实现了 IAsyncResult,从高层次的角度来看,APM 和 TAP 非常相似(IAsyncResultTask表示一个操作 "in flight").

您应该会发现您的 TAP 代码比当前的 APM/EAP 代码简单得多(并且更易于维护!),并且性能没有明显变化。

(附带说明一下,考虑转移到 HttpClient,它是从头开始设计的,考虑到了 TAP,而不是 HttpWebRequest/WebClient,它有 TAP固定在他们身上)。

然而...

I have a system that spawns a LOT of sub-processes that must run in parallel...

对于这种 "pipeline",您可能需要考虑转换为 TPL 数据流。 Dataflow 了解同步和异步 (TAP) 工作,并且内置了对节流的支持。数据流方法可以比 TAP 本身更进一步地简化您的代码。

根据@Stephen Cleary 的回答,我设置了一个简短的测试来进一步证明这一点。

下面的代码,当 运行 同步方法时,不修改 SetMinThreads,并且当目标网站需要几秒钟到 return 时,将为每个请求打开一个线程.它将显示越来越多的活动线程,它会立即启动前几个任务,但随后 "choke up" 当它达到 ThreadPool 的限制并且只允许每半秒启动一次新线程,或者当旧请求结束。

设置更高的 MinThreadCount 会按预期延迟问题。

保持 MinThread 计数未设置,但切换到异步 (APM) 方法或等待 (TAP) 方法会导致所有任务立即启动,并且任何时候活动的线程数都保持在较低水平。

using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace LockTraceParser
{
  internal class AsyncThreadsTester
  {
    public void Run()
    {
      //ThreadPool.SetMinThreads(100, 100);

      Console.WriteLine("Beginning Test: ");
      LogThreadCounts();

      Test();
    }

    private void Test()
    {
      LogThreadCounts();

      for (int i = 0; i < 65; i++)
      {
        //StartParallelUserWorkItem(i);
        StartTask(i);
        Thread.Sleep(100); //sleep a while so that the other thread is working
        LogThreadCounts();
      }

      for (int i = 0; i < 40; i++)
      {
        Thread.Sleep(1100); //sleep a while so that the other thread is working
        LogThreadCounts();
      }
    }

    private void StartTask(int label)
    {
      var taskLabel = "Task " + label;
      Console.WriteLine("Enqueue " + taskLabel);
      Task.Run(() => GetResponseAwait(taskLabel));
    }

    private static void LogThreadCounts()
    {
      int worker;
      int io;
      ThreadPool.GetAvailableThreads(out worker, out io);
      Console.WriteLine("Worker Threads Available:" + '\t' + worker + '\t' + "IO Threads Available:" + '\t' + io + '\t' +
                        "Threads held by Process: " + '\t' + Process.GetCurrentProcess().Threads.Count);
    }


    private void GetResponseSync(object label)
    {
      Console.WriteLine("Start Sync     " + label);
      try
      {
        var req = GetRequest();
        using (var resp = req.GetResponse())
        {
          Console.WriteLine(resp.ContentLength);
        }
      }
      catch (Exception e)
      {
        Console.WriteLine("Error response " + label);
      }
      Console.WriteLine("End response   " + label);
    }

    private void BeginResponseAsync(object label)
    {
      Console.WriteLine("Start Async     " + label);
      try
      {
        var req = GetRequest();
        req.BeginGetResponse(EndGetResponseAsync, req);
      }
      catch (Exception e)
      {
        Console.WriteLine("Error Async " + label);
      }
    }

    private void EndGetResponseAsync(IAsyncResult result)
    {
      Console.WriteLine("Respond Async   ");
      var req = (WebRequest)result.AsyncState;

      using (var resp = req.EndGetResponse(result))
      {
        Console.WriteLine(resp.ContentLength);
      }
      Console.WriteLine("End Async   ");
    }

    private async Task GetResponseAwait(object label)
    {
      Console.WriteLine("Start Await     " + label);
      try
      {
        var req = GetRequest();
        using (var resp = await req.GetResponseAsync())
        {
          Console.WriteLine(resp.ContentLength);
        }
      }
      catch (Exception e)
      {
        Console.WriteLine("Error Await " + label);
      }
      Console.WriteLine("End Await   " + label);
    }

    private WebRequest GetRequest()
    {
      var req = WebRequest.Create("http://aslowwebsite.com");
      req.Timeout = (int)TimeSpan.FromSeconds(60).TotalMilliseconds;

      return req;
    }
  }
}