使用 CancellationToken.None 使用 ConcurrentAppendAsync 并发附加内容时出现 TaskCanceledException
TaskCanceledException when appending content concurrent using ConcurrentAppendAsync using CancellationToken.None
我编写了一个日志系统,可以将 json 格式的日志记录写入 Azure Data Lake 中的文件。多个线程正在写入该文件,所以我使用 ConcurrentAppendAsync
方法。
我这样初始化访问:
var clientCredential = new ClientCredential(ClientId, ClientSecret);
var creds = ApplicationTokenProvider.LoginSilentAsync(Domain, clientCredential).GetAwaiter().GetResult();
fileSystemClient = new DataLakeStoreFileSystemManagementClient(creds);
fileSystem = fileSystemClient.FileSystem;
然后我这样写内容:
private async Task WriteToDataLakeAsync(IGrouping<string, LogEvent> logEvents)
{
var logEvent = logEvents.FirstOrDefault();
if (logEvent == null)
return;
var filename = string.Format(@"Logs\{0:yyyy}\{0:MM}\{0:dd}\{1}\{2}.json", logEvent.Session.Timestamp, logEvent.Session.Origin, logEvent.Session.SessionId);
var jsonBuilder = new StringBuilder();
foreach (var @event in logEvents)
{
jsonBuilder.AppendLine(JsonConvert.SerializeObject(@event.SimplifyForBlob(), Formatting.None));
}
try
{
await fileSystem.ConcurrentAppendAsync(Account, filename, new MemoryStream(Encoding.UTF8.GetBytes(jsonBuilder.ToString())), AppendModeType.Autocreate, cancellationToken: CancellationToken.None);
}
catch (Exception ex)
{
telemetryClient.TrackException(new ExceptionTelemetry(new Exception($"{nameof(DataLake)}: Failed to write {filename}", ex)));
}
}
问题是,尽管我将 CancellationToken.None
传递给 fileSystem.ConcurrentAppendAsync(...)
,但我的 catch 块中仍然得到 TaskCanceledException
:
System.Exception:
System.Threading.Tasks.TaskCanceledException:
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at Microsoft.Rest.RetryDelegatingHandler+<>c__DisplayClass11_0+<b__1>d.MoveNext (Microsoft.Rest.ClientRuntime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at Microsoft.Rest.RetryDelegatingHandler+d__11.MoveNext (Microsoft.Rest.ClientRuntime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at System.Net.Http.HttpClient+d__58.MoveNext (System.Net.Http, Version=4.1.1.1, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at Microsoft.Azure.Management.DataLake.Store.FileSystemOperations+d__10.MoveNext (Microsoft.Azure.Management.DataLake.Store, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at Microsoft.Azure.Management.DataLake.Store.FileSystemOperationsExtensions+d__3.MoveNext (Microsoft.Azure.Management.DataLake.Store, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089)
at DHS.PlanCare.CentralLogging.EventHub.Processors.StreamProcessors.DataLake+d__15.MoveNext (DHS.PlanCare.CentralLogging.EventHub.Processors, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null: D:\Sources\DHS\DHS.PlanCare.ManagementPortal\DHS.PlanCare.ManagementPortal.EventHub.Processors\StreamProcessors\DataLake.cs: 95)
(第 95 行是对 ConcurrentAppendAsync
的调用)
这里会发生什么? the docs 对这个异常只字不提也于事无补。文档中甚至有一个错误,因为它指出当未传递 CancellationToken 时,该值将为 null
,这是不可能的,因为 CancellationToken 是值类型,因此不能为 null
.
所以我确实知道我不会发起任何类型的取消。问题是,我可以安全地忽略这个异常,还是根本不会写入数据。由于摄入的数据量很大,而且这种情况偶尔会发生,因此很难检查数据是否实际写入。
内部使用的 HttpClient
在超时时抛出 OperationCanceledException
。这些超时可能发生在活动连接期间或尝试建立连接时。 DataLakeStoreFileSystemManagementClient
有一个构造函数,它带有一个可选的 clientTimeoutInMinutes
参数,用于设置 HttpClient.Timeout
.
在这种情况下,实际上是 Microsoft.Rest.RetryDelegatingHandler
响应 HttpClient
设置的取消而抛出异常。
它通常也来自内部 CancellationTokenSource.CancelAfter
,也可用于超时。
因为区分超时和真正的取消请求很有用,我通常用这样的方式包装:
try
{
...
}
catch (OperationCanceledException ex) when (!cancellationToken.IsCancellationRequested)
{
throw new TimeoutException("The operation timed out.", ex);
}
我编写了一个日志系统,可以将 json 格式的日志记录写入 Azure Data Lake 中的文件。多个线程正在写入该文件,所以我使用 ConcurrentAppendAsync
方法。
我这样初始化访问:
var clientCredential = new ClientCredential(ClientId, ClientSecret);
var creds = ApplicationTokenProvider.LoginSilentAsync(Domain, clientCredential).GetAwaiter().GetResult();
fileSystemClient = new DataLakeStoreFileSystemManagementClient(creds);
fileSystem = fileSystemClient.FileSystem;
然后我这样写内容:
private async Task WriteToDataLakeAsync(IGrouping<string, LogEvent> logEvents)
{
var logEvent = logEvents.FirstOrDefault();
if (logEvent == null)
return;
var filename = string.Format(@"Logs\{0:yyyy}\{0:MM}\{0:dd}\{1}\{2}.json", logEvent.Session.Timestamp, logEvent.Session.Origin, logEvent.Session.SessionId);
var jsonBuilder = new StringBuilder();
foreach (var @event in logEvents)
{
jsonBuilder.AppendLine(JsonConvert.SerializeObject(@event.SimplifyForBlob(), Formatting.None));
}
try
{
await fileSystem.ConcurrentAppendAsync(Account, filename, new MemoryStream(Encoding.UTF8.GetBytes(jsonBuilder.ToString())), AppendModeType.Autocreate, cancellationToken: CancellationToken.None);
}
catch (Exception ex)
{
telemetryClient.TrackException(new ExceptionTelemetry(new Exception($"{nameof(DataLake)}: Failed to write {filename}", ex)));
}
}
问题是,尽管我将 CancellationToken.None
传递给 fileSystem.ConcurrentAppendAsync(...)
,但我的 catch 块中仍然得到 TaskCanceledException
:
System.Exception: System.Threading.Tasks.TaskCanceledException: at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at Microsoft.Rest.RetryDelegatingHandler+<>c__DisplayClass11_0+<b__1>d.MoveNext (Microsoft.Rest.ClientRuntime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at Microsoft.Rest.RetryDelegatingHandler+d__11.MoveNext (Microsoft.Rest.ClientRuntime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Net.Http.HttpClient+d__58.MoveNext (System.Net.Http, Version=4.1.1.1, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at Microsoft.Azure.Management.DataLake.Store.FileSystemOperations+d__10.MoveNext (Microsoft.Azure.Management.DataLake.Store, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at Microsoft.Azure.Management.DataLake.Store.FileSystemOperationsExtensions+d__3.MoveNext (Microsoft.Azure.Management.DataLake.Store, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at DHS.PlanCare.CentralLogging.EventHub.Processors.StreamProcessors.DataLake+d__15.MoveNext (DHS.PlanCare.CentralLogging.EventHub.Processors, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null: D:\Sources\DHS\DHS.PlanCare.ManagementPortal\DHS.PlanCare.ManagementPortal.EventHub.Processors\StreamProcessors\DataLake.cs: 95)
(第 95 行是对 ConcurrentAppendAsync
的调用)
这里会发生什么? the docs 对这个异常只字不提也于事无补。文档中甚至有一个错误,因为它指出当未传递 CancellationToken 时,该值将为 null
,这是不可能的,因为 CancellationToken 是值类型,因此不能为 null
.
所以我确实知道我不会发起任何类型的取消。问题是,我可以安全地忽略这个异常,还是根本不会写入数据。由于摄入的数据量很大,而且这种情况偶尔会发生,因此很难检查数据是否实际写入。
HttpClient
在超时时抛出 OperationCanceledException
。这些超时可能发生在活动连接期间或尝试建立连接时。 DataLakeStoreFileSystemManagementClient
有一个构造函数,它带有一个可选的 clientTimeoutInMinutes
参数,用于设置 HttpClient.Timeout
.
在这种情况下,实际上是 Microsoft.Rest.RetryDelegatingHandler
响应 HttpClient
设置的取消而抛出异常。
它通常也来自内部 CancellationTokenSource.CancelAfter
,也可用于超时。
因为区分超时和真正的取消请求很有用,我通常用这样的方式包装:
try
{
...
}
catch (OperationCanceledException ex) when (!cancellationToken.IsCancellationRequested)
{
throw new TimeoutException("The operation timed out.", ex);
}