为什么我的异步延续计划在线程池之外?
Why are my async continuations scheduled outside of the thread pool?
请在标记为重复之前阅读问题。这道题实现了this question中提供的方案,仍然遇到死锁
我正在调试一个大型多线程应用程序,该应用程序使用 .Net 客户端库对各种 Google API 进行许多并发调用,我们偶尔会在某些请求期间遇到死锁。关于应用程序的一些信息:
- 应用程序是一个 Windows 服务(SynchronizationContext 为空)
- .Net Framework 版本为4.5
- 发出 API 请求的所有线程都不在默认线程池中。
具体来说,我们正在调用 Execute() 方法,该方法使用异步方法并在等待结果时阻塞
public TResponse Execute()
{
try
{
using (var response = ExecuteUnparsedAsync(CancellationToken.None).Result)
{
return ParseResponse(response).Result;
}
}
...
}
这反过来调用 ExecuteUnparsedAsync() 执行 HttpClient.SendAsync()
private async Task<HttpResponseMessage> ExecuteUnparsedAsync(CancellationToken cancellationToken)
{
using (var request = CreateRequest())
{
return await service.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
}
}
现在,我了解到我们可以通过多种方式在这里遇到死锁,并且最好将应用程序更改为使用异步方法来避免它们。不幸的是,这将是一项重要的时间投资,目前不可能,但将来可能会。
我的具体问题是所有调用线程都不在线程池中,因为正在调用 ConfigureAwait(false)
我希望延续将始终在线程池中 运行,但那是不是这样的。相反,似乎继续是在原始调用线程上安排的,并且线程死锁,因为它正在等待结果。
使用以下 MCVE,我可以在几个小时内产生死锁。
using Google.Apis.Auth.OAuth2;
using Google.Apis.Drive.v2;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace DeadlockTest
{
class Program
{
const int NUM_THREADS = 70;
static long[] s_lastExecute = new long[NUM_THREADS];
static long count = 0;
static void Main(string[] args)
{
ServicePointManager.DefaultConnectionLimit = 50;
for(int i = 0; i < s_lastExecute.Length; i++)
{
s_lastExecute[i] = DateTime.Now.ToBinary();
}
Thread deadlockCheck = new Thread(new ThreadStart(CheckForDeadlock));
deadlockCheck.Start();
RunThreads();
deadlockCheck.Join();
}
static void RunThreads()
{
List<Thread> threads = new List<Thread>();
for (int i = 0; i < NUM_THREADS; i++)
{
int threadIndex = i;
Thread thread = new Thread(
new ParameterizedThreadStart(BeginThread));
thread.Start(threadIndex);
threads.Add(thread);
}
foreach(var thread in threads)
{
thread.Join();
}
}
static void BeginThread(object threadIndex)
{
Debug.Assert(SynchronizationContext.Current == null);
Debug.Assert(Thread.CurrentThread.IsThreadPoolThread == false);
ThreadLoop((int)threadIndex);
}
static void ThreadLoop(int threadIndex)
{
Random random = new Random(threadIndex);
while (true)
{
try
{
GoogleDrive.Test(random);
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
}
Interlocked.Exchange(ref s_lastExecute[threadIndex], DateTime.Now.ToBinary());
Interlocked.Increment(ref count);
}
}
private static void CheckForDeadlock()
{
Console.WriteLine("Deadlock check started");
TimeSpan period = TimeSpan.FromMinutes(1);
TimeSpan deadlockThreshold = TimeSpan.FromMinutes(10);
while (true)
{
Thread.Sleep((int)period.TotalMilliseconds);
DateTime now = DateTime.Now;
TimeSpan oldestUpdate = TimeSpan.MinValue;
for (int i = 0; i < NUM_THREADS; i++)
{
DateTime lastExecute = DateTime.FromBinary(
Interlocked.Read(ref s_lastExecute[i]));
TimeSpan delta = now - lastExecute;
if(delta > oldestUpdate)
{
oldestUpdate = delta;
}
if (delta > deadlockThreshold)
{
var msg = string.Format("Deadlock detected in thread {0} for {1} minutes",
i.ToString(), (now - lastExecute).TotalMinutes);
Console.WriteLine(msg);
}
}
int workerThreads, completionPortThreads;
System.Threading.ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
Console.WriteLine("Checked for deadlocks.");
Console.WriteLine("\tWorker threads: " + workerThreads.ToString());
Console.WriteLine("\tCompletion port threads: " + completionPortThreads.ToString());
Console.WriteLine("\tExecute calls: " + Interlocked.Read(ref count).ToString());
Console.WriteLine("\tOldest update (minutes): " + oldestUpdate.TotalMinutes.ToString());
}
}
}
class GoogleDrive
{
const string SERVICE_ACCOUNT = @"<path_to_service_account>";
static string[] SCOPES = { DriveService.Scope.Drive };
public static DriveService GetDriveService(string user)
{
GoogleCredential credential;
using (var stream = new FileStream(SERVICE_ACCOUNT, FileMode.Open, FileAccess.Read))
{
credential = GoogleCredential
.FromStream(stream)
.CreateScoped(SCOPES)
.CreateWithUser(user);
}
var service = new DriveService(new DriveService.Initializer()
{
HttpClientInitializer = credential
});
return service;
}
public static void Test(Random random)
{
int userIndex = random.Next(Users.USERS.Length);
string user = Users.USERS[userIndex];
using (DriveService service = GetDriveService(user))
{
var request = service.Files.List();
var result = request.Execute();
}
}
}
public static class Users
{
public static string[] USERS = new string[]
{
"user0000@domain.com",
"user0001@domain.com",
...
};
}
}
运行 这个一夜之间的测试给了我以下内容:
Deadlock detected in thread 15 for 274.216744496667 minutes
Deadlock detected in thread 45 for 154.73506413 minutes
Deadlock detected in thread 46 for 844.978023301667 minutes
Checked for deadlocks.
Worker threads: 2045
Completion port threads: 989
Execute calls: 2153228
Oldest update (minutes): 844.978023301667
一旦检测到死锁,我就可以在线程循环中插入一个中断,让 运行ning 个线程退出。这给我留下了主线程、计时器线程、运行time 使用的两个线程和我的死锁线程(在本例中为三个线程。另请注意,线程 ID 不匹配,因为我不够聪明使用实际线程 ID):
每个线程都有以下调用堆栈:
mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext)
mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout)
mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken)
mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken)
mscorlib.dll!System.Threading.Tasks.Task.InternalWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken)
mscorlib.dll!System.Threading.Tasks.Task<System.Net.Http.HttpResponseMessage>.GetResultCore(bool waitCompletionNotification)
mscorlib.dll!System.Threading.Tasks.Task<System.__Canon>.Result.get()
Google.Apis.dll!Google.Apis.Requests.ClientServiceRequest<Google.Apis.Drive.v2.Data.FileList>.Execute()
> DeadlockTest.exe!DeadlockTest.GoogleDrive.Test(System.Random random)
DeadlockTest.exe!DeadlockTest.Program.ThreadLoop(int threadIndex)
DeadlockTest.exe!DeadlockTest.Program.BeginThread(object threadIndex)
mscorlib.dll!System.Threading.ThreadHelper.ThreadStart_Context(object state)
mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)
mscorlib.dll!System.Threading.ThreadHelper.ThreadStart(object obj)
这还给我留下了以下剩余任务:
我不知道有什么方法可以准确地找出任务安排在哪个线程上,但我认为很明显它们是为死锁线程安排的(即它们是死锁的根源)。
这让我想到了我的问题:
- 为什么没有在线程池而不是调用线程中安排延续任务?
- 有没有办法强制任务在线程池上运行,但结果没有线程池线程块?
我假设这里的真正目标是消除死锁,为此我不认为您的延续 运行宁在什么线程上什至是相关的。我看到 2 个问题:
Google的Execute()方法完全错误。我什至会考虑报告一个错误,因为 HttpClient
不支持同步调用,并且使用 .Result
阻塞调用会导致死锁,就这样。没有办法解决这个问题。
您可能通过强制新线程加入混合来增加死锁的可能性。一个常见的误解是并发需要线程,而在 I/O 绑定工作的情况下,这是不正确的。等待 I/O 的结果(您的代码可能花费大部分时间做的事情)需要 no thread at all。底层子系统很复杂,但它们被异步返回任务的 API 优雅地抽象出来,如 HttpClient
。使用任务而不是线程,这些子系统将决定何时需要新线程,何时不需要。
所有这些都会导致您希望避免的结论 - 在您的代码中采用异步方式。你说这现在不可行,所以尽管我讨厌建议不是 100% 正确的代码,但希望一个公平的妥协是中途实现,以换取死锁可能性的显着降低。
如果可行,我的建议是重构您的线程创建方法(RunThreads
在您的 MCVE 中)以使用任务(我们称之为 RunTasks
),并将所有内容转换为 down 异步调用堆栈,结束于调用 Google 的 ExecuteAsync
而不是 Execute
。以下是重要部分的外观:
static void RunTasks()
{
List<Task> tasks = new List<Task>();
for (int i = 0; i < NUM_TASKS; i++)
{
tasks.Add(BeginTaskAsync(i));
}
// Your long-term goal should be to replace this with await Task.WhenAll(tasks);
Task.WaitAll(tasks);
}
// work down your call stack here and convert methods to use async/await,
// eventually calling await ExecuteAsync() from the Google lib...
static async Task BeginTaskAsync(int taskIndex)
{
...
await ThreadLoopAsync(taskIndex);
}
static async Task ThreadLoopAsync(int taskIndex)
{
Random random = new Random(taskIndex);
while (true)
{
try
{
await GoogleDrive.TestAsync(random);
}
...
}
}
class GoogleDrive
{
...
public static async Task TestAsync(Random random)
{
...
using (DriveService service = GetDriveService(user))
{
var request = service.Files.List();
var result = await request.ExecuteAsync();
}
}
}
我提到的 "compromise" 是对 Task.WaitAll
的调用。这是一个阻塞调用,因此仍然不能保证您不会 运行 在这里陷入僵局。但如果您没有 time/resources 正确地一直异步 up 调用堆栈,这应该是一个很大的改进。您的线程阻塞要少得多,总体上线程要少得多。
请在标记为重复之前阅读问题。这道题实现了this question中提供的方案,仍然遇到死锁
我正在调试一个大型多线程应用程序,该应用程序使用 .Net 客户端库对各种 Google API 进行许多并发调用,我们偶尔会在某些请求期间遇到死锁。关于应用程序的一些信息:
- 应用程序是一个 Windows 服务(SynchronizationContext 为空)
- .Net Framework 版本为4.5
- 发出 API 请求的所有线程都不在默认线程池中。
具体来说,我们正在调用 Execute() 方法,该方法使用异步方法并在等待结果时阻塞
public TResponse Execute()
{
try
{
using (var response = ExecuteUnparsedAsync(CancellationToken.None).Result)
{
return ParseResponse(response).Result;
}
}
...
}
这反过来调用 ExecuteUnparsedAsync() 执行 HttpClient.SendAsync()
private async Task<HttpResponseMessage> ExecuteUnparsedAsync(CancellationToken cancellationToken)
{
using (var request = CreateRequest())
{
return await service.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
}
}
现在,我了解到我们可以通过多种方式在这里遇到死锁,并且最好将应用程序更改为使用异步方法来避免它们。不幸的是,这将是一项重要的时间投资,目前不可能,但将来可能会。
我的具体问题是所有调用线程都不在线程池中,因为正在调用 ConfigureAwait(false)
我希望延续将始终在线程池中 运行,但那是不是这样的。相反,似乎继续是在原始调用线程上安排的,并且线程死锁,因为它正在等待结果。
使用以下 MCVE,我可以在几个小时内产生死锁。
using Google.Apis.Auth.OAuth2;
using Google.Apis.Drive.v2;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace DeadlockTest
{
class Program
{
const int NUM_THREADS = 70;
static long[] s_lastExecute = new long[NUM_THREADS];
static long count = 0;
static void Main(string[] args)
{
ServicePointManager.DefaultConnectionLimit = 50;
for(int i = 0; i < s_lastExecute.Length; i++)
{
s_lastExecute[i] = DateTime.Now.ToBinary();
}
Thread deadlockCheck = new Thread(new ThreadStart(CheckForDeadlock));
deadlockCheck.Start();
RunThreads();
deadlockCheck.Join();
}
static void RunThreads()
{
List<Thread> threads = new List<Thread>();
for (int i = 0; i < NUM_THREADS; i++)
{
int threadIndex = i;
Thread thread = new Thread(
new ParameterizedThreadStart(BeginThread));
thread.Start(threadIndex);
threads.Add(thread);
}
foreach(var thread in threads)
{
thread.Join();
}
}
static void BeginThread(object threadIndex)
{
Debug.Assert(SynchronizationContext.Current == null);
Debug.Assert(Thread.CurrentThread.IsThreadPoolThread == false);
ThreadLoop((int)threadIndex);
}
static void ThreadLoop(int threadIndex)
{
Random random = new Random(threadIndex);
while (true)
{
try
{
GoogleDrive.Test(random);
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
}
Interlocked.Exchange(ref s_lastExecute[threadIndex], DateTime.Now.ToBinary());
Interlocked.Increment(ref count);
}
}
private static void CheckForDeadlock()
{
Console.WriteLine("Deadlock check started");
TimeSpan period = TimeSpan.FromMinutes(1);
TimeSpan deadlockThreshold = TimeSpan.FromMinutes(10);
while (true)
{
Thread.Sleep((int)period.TotalMilliseconds);
DateTime now = DateTime.Now;
TimeSpan oldestUpdate = TimeSpan.MinValue;
for (int i = 0; i < NUM_THREADS; i++)
{
DateTime lastExecute = DateTime.FromBinary(
Interlocked.Read(ref s_lastExecute[i]));
TimeSpan delta = now - lastExecute;
if(delta > oldestUpdate)
{
oldestUpdate = delta;
}
if (delta > deadlockThreshold)
{
var msg = string.Format("Deadlock detected in thread {0} for {1} minutes",
i.ToString(), (now - lastExecute).TotalMinutes);
Console.WriteLine(msg);
}
}
int workerThreads, completionPortThreads;
System.Threading.ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
Console.WriteLine("Checked for deadlocks.");
Console.WriteLine("\tWorker threads: " + workerThreads.ToString());
Console.WriteLine("\tCompletion port threads: " + completionPortThreads.ToString());
Console.WriteLine("\tExecute calls: " + Interlocked.Read(ref count).ToString());
Console.WriteLine("\tOldest update (minutes): " + oldestUpdate.TotalMinutes.ToString());
}
}
}
class GoogleDrive
{
const string SERVICE_ACCOUNT = @"<path_to_service_account>";
static string[] SCOPES = { DriveService.Scope.Drive };
public static DriveService GetDriveService(string user)
{
GoogleCredential credential;
using (var stream = new FileStream(SERVICE_ACCOUNT, FileMode.Open, FileAccess.Read))
{
credential = GoogleCredential
.FromStream(stream)
.CreateScoped(SCOPES)
.CreateWithUser(user);
}
var service = new DriveService(new DriveService.Initializer()
{
HttpClientInitializer = credential
});
return service;
}
public static void Test(Random random)
{
int userIndex = random.Next(Users.USERS.Length);
string user = Users.USERS[userIndex];
using (DriveService service = GetDriveService(user))
{
var request = service.Files.List();
var result = request.Execute();
}
}
}
public static class Users
{
public static string[] USERS = new string[]
{
"user0000@domain.com",
"user0001@domain.com",
...
};
}
}
运行 这个一夜之间的测试给了我以下内容:
Deadlock detected in thread 15 for 274.216744496667 minutes
Deadlock detected in thread 45 for 154.73506413 minutes
Deadlock detected in thread 46 for 844.978023301667 minutes
Checked for deadlocks.
Worker threads: 2045
Completion port threads: 989
Execute calls: 2153228
Oldest update (minutes): 844.978023301667
一旦检测到死锁,我就可以在线程循环中插入一个中断,让 运行ning 个线程退出。这给我留下了主线程、计时器线程、运行time 使用的两个线程和我的死锁线程(在本例中为三个线程。另请注意,线程 ID 不匹配,因为我不够聪明使用实际线程 ID):
每个线程都有以下调用堆栈:
mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext)
mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout)
mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken)
mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken)
mscorlib.dll!System.Threading.Tasks.Task.InternalWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken)
mscorlib.dll!System.Threading.Tasks.Task<System.Net.Http.HttpResponseMessage>.GetResultCore(bool waitCompletionNotification)
mscorlib.dll!System.Threading.Tasks.Task<System.__Canon>.Result.get()
Google.Apis.dll!Google.Apis.Requests.ClientServiceRequest<Google.Apis.Drive.v2.Data.FileList>.Execute()
> DeadlockTest.exe!DeadlockTest.GoogleDrive.Test(System.Random random)
DeadlockTest.exe!DeadlockTest.Program.ThreadLoop(int threadIndex)
DeadlockTest.exe!DeadlockTest.Program.BeginThread(object threadIndex)
mscorlib.dll!System.Threading.ThreadHelper.ThreadStart_Context(object state)
mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)
mscorlib.dll!System.Threading.ThreadHelper.ThreadStart(object obj)
这还给我留下了以下剩余任务:
我不知道有什么方法可以准确地找出任务安排在哪个线程上,但我认为很明显它们是为死锁线程安排的(即它们是死锁的根源)。
这让我想到了我的问题:
- 为什么没有在线程池而不是调用线程中安排延续任务?
- 有没有办法强制任务在线程池上运行,但结果没有线程池线程块?
我假设这里的真正目标是消除死锁,为此我不认为您的延续 运行宁在什么线程上什至是相关的。我看到 2 个问题:
Google的Execute()方法完全错误。我什至会考虑报告一个错误,因为
HttpClient
不支持同步调用,并且使用.Result
阻塞调用会导致死锁,就这样。没有办法解决这个问题。您可能通过强制新线程加入混合来增加死锁的可能性。一个常见的误解是并发需要线程,而在 I/O 绑定工作的情况下,这是不正确的。等待 I/O 的结果(您的代码可能花费大部分时间做的事情)需要 no thread at all。底层子系统很复杂,但它们被异步返回任务的 API 优雅地抽象出来,如
HttpClient
。使用任务而不是线程,这些子系统将决定何时需要新线程,何时不需要。
所有这些都会导致您希望避免的结论 - 在您的代码中采用异步方式。你说这现在不可行,所以尽管我讨厌建议不是 100% 正确的代码,但希望一个公平的妥协是中途实现,以换取死锁可能性的显着降低。
如果可行,我的建议是重构您的线程创建方法(RunThreads
在您的 MCVE 中)以使用任务(我们称之为 RunTasks
),并将所有内容转换为 down 异步调用堆栈,结束于调用 Google 的 ExecuteAsync
而不是 Execute
。以下是重要部分的外观:
static void RunTasks()
{
List<Task> tasks = new List<Task>();
for (int i = 0; i < NUM_TASKS; i++)
{
tasks.Add(BeginTaskAsync(i));
}
// Your long-term goal should be to replace this with await Task.WhenAll(tasks);
Task.WaitAll(tasks);
}
// work down your call stack here and convert methods to use async/await,
// eventually calling await ExecuteAsync() from the Google lib...
static async Task BeginTaskAsync(int taskIndex)
{
...
await ThreadLoopAsync(taskIndex);
}
static async Task ThreadLoopAsync(int taskIndex)
{
Random random = new Random(taskIndex);
while (true)
{
try
{
await GoogleDrive.TestAsync(random);
}
...
}
}
class GoogleDrive
{
...
public static async Task TestAsync(Random random)
{
...
using (DriveService service = GetDriveService(user))
{
var request = service.Files.List();
var result = await request.ExecuteAsync();
}
}
}
我提到的 "compromise" 是对 Task.WaitAll
的调用。这是一个阻塞调用,因此仍然不能保证您不会 运行 在这里陷入僵局。但如果您没有 time/resources 正确地一直异步 up 调用堆栈,这应该是一个很大的改进。您的线程阻塞要少得多,总体上线程要少得多。