使用 CancellationToken.None 使用 ConcurrentAppendAsync 并发附加内容时出现 TaskCanceledException

TaskCanceledException when appending content concurrent using ConcurrentAppendAsync using CancellationToken.None

我编写了一个日志系统,可以将 json 格式的日志记录写入 Azure Data Lake 中的文件。多个线程正在写入该文件,所以我使用 ConcurrentAppendAsync 方法。

我这样初始化访问:

var clientCredential = new ClientCredential(ClientId, ClientSecret);
var creds = ApplicationTokenProvider.LoginSilentAsync(Domain, clientCredential).GetAwaiter().GetResult();
fileSystemClient = new DataLakeStoreFileSystemManagementClient(creds);
fileSystem = fileSystemClient.FileSystem;

然后我这样写内容:

    private async Task WriteToDataLakeAsync(IGrouping<string, LogEvent> logEvents)
    {
        var logEvent = logEvents.FirstOrDefault();
        if (logEvent == null)
            return;

        var filename = string.Format(@"Logs\{0:yyyy}\{0:MM}\{0:dd}\{1}\{2}.json", logEvent.Session.Timestamp, logEvent.Session.Origin, logEvent.Session.SessionId);

        var jsonBuilder = new StringBuilder();

        foreach (var @event in logEvents)
        {
            jsonBuilder.AppendLine(JsonConvert.SerializeObject(@event.SimplifyForBlob(), Formatting.None));
        }

        try
        {
            await fileSystem.ConcurrentAppendAsync(Account, filename, new MemoryStream(Encoding.UTF8.GetBytes(jsonBuilder.ToString())), AppendModeType.Autocreate, cancellationToken: CancellationToken.None);
        }
        catch (Exception ex)
        {
            telemetryClient.TrackException(new ExceptionTelemetry(new Exception($"{nameof(DataLake)}: Failed to write {filename}", ex)));
        }
    }

问题是,尽管我将 CancellationToken.None 传递给 fileSystem.ConcurrentAppendAsync(...),但我的 catch 块中仍然得到 TaskCanceledException

System.Exception: System.Threading.Tasks.TaskCanceledException: at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at Microsoft.Rest.RetryDelegatingHandler+<>c__DisplayClass11_0+<b__1>d.MoveNext (Microsoft.Rest.ClientRuntime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at Microsoft.Rest.RetryDelegatingHandler+d__11.MoveNext (Microsoft.Rest.ClientRuntime, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Net.Http.HttpClient+d__58.MoveNext (System.Net.Http, Version=4.1.1.1, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at Microsoft.Azure.Management.DataLake.Store.FileSystemOperations+d__10.MoveNext (Microsoft.Azure.Management.DataLake.Store, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at Microsoft.Azure.Management.DataLake.Store.FileSystemOperationsExtensions+d__3.MoveNext (Microsoft.Azure.Management.DataLake.Store, Version=2.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089) at DHS.PlanCare.CentralLogging.EventHub.Processors.StreamProcessors.DataLake+d__15.MoveNext (DHS.PlanCare.CentralLogging.EventHub.Processors, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null: D:\Sources\DHS\DHS.PlanCare.ManagementPortal\DHS.PlanCare.ManagementPortal.EventHub.Processors\StreamProcessors\DataLake.cs: 95)

(第 95 行是对 ConcurrentAppendAsync 的调用)

这里会发生什么? the docs 对这个异常只字不提也于事无补。文档中甚至有一个错误,因为它指出当未传递 CancellationToken 时,该值将为 null,这是不可能的,因为 CancellationToken 是值类型,因此不能为 null.

所以我确实知道我不会发起任何类型的取消。问题是,我可以安全地忽略这个异常,还是根本不会写入数据。由于摄入的数据量很大,而且这种情况偶尔会发生,因此很难检查数据是否实际写入。

内部使用的

HttpClient 在超时时抛出 OperationCanceledException。这些超时可能发生在活动连接期间或尝试建立连接时。 DataLakeStoreFileSystemManagementClient 有一个构造函数,它带有一个可选的 clientTimeoutInMinutes 参数,用于设置 HttpClient.Timeout.

在这种情况下,实际上是 Microsoft.Rest.RetryDelegatingHandler 响应 HttpClient 设置的取消而抛出异常。

它通常也来自内部 CancellationTokenSource.CancelAfter,也可用于超时。

因为区分超时和真正的取消请求很有用,我通常用这样的方式包装:

try
{
    ...
}
catch (OperationCanceledException ex) when (!cancellationToken.IsCancellationRequested)
{
    throw new TimeoutException("The operation timed out.", ex);
}