在 TPL 数据流管道中导致死锁的异常
Exception causing deadlock in TPL Dataflow pipeline
我使用 BufferBlock
、TransformBlock
和 ActionBlock
创建了一个 DataFlow 管道。由于 TransformBlock
中的异常,应用程序将死锁。我正在使用 BoundedCapacity
.
限制数据
我的代码是这样的:
public async Task PerformOperation()
{
var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 });
var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
//Call an api to fetch result.
//Here for some data i get exception
return ObjA;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
var finalBlock = new ActionBlock<ObjA>((item) => {
if (item != null)
{
SaveToDB(item);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });
bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });
await FetchData(bufferBlock);
bufferBlock.Complete();
await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
}
public async Task FetchData(bufferBlock)
{
List<ObjA> dataToProcessList = GetFromDB();
foreach (var item in dataToProcessList)
{
await bufferBlock.SendAsync(item);
}
}
这里,如果 fetchApiResponse
块中出现异常,则数据不会移动,并且会进入死锁状态。
如何处理此管道中的异常?
这里大约有 200,000 条记录被推送到 bufferBlock。
在不导致此死锁的情况下处理异常的最佳方法是什么?
更新 1:
还添加了 FetchData
方法。
谢谢
比尼尔
与其试图弄清楚什么是错误的,哪些是错误的,块应该不允许未处理的异常。这是一个很常见的模式,也用于[Go pipelines]9https://blog.golang.org/pipelines)
文章 Exception Handling in TPL Dataflow Networks 解释了如何处理异常。
- 当抛出未处理的异常时,只有在所有并发操作完成后,块才会进入故障状态。
- 该状态传播到
PropagateCompletion
设置为 true
的链接块。不过,这并不意味着下游块会立即出现故障。
等待错误块抛出。该行:
await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
应该抛出,除非那些块仍然很忙。
解决方案 - 不允许未处理的异常
Return 一个 Result
对象。例如,当必须进行 1000 次 HTTP 调用时,无论如何让一个异常阻止其他 900 次调用并不是一个好主意。这与 Railroad-oriented programming 大致相似。数据流管道与功能管道非常相似。
每个块应该 return 一个 Result<T>
class 包装实际结果并以某种方式指示成功或失败。异常处理块应该捕获任何异常和 return 错误的 Result<T>
项。 LinkTo
方法可以有一个谓词,允许将失败的结果重定向到例如日志记录块或 NullBlock。
假设我们有这个简单的 Result<T>
:
class Result<T>
{
public T Value{get;}
public Exception Exception{get;}
public bool Ok {get;}
public Result(){}
public Result(T value)
{
Value=value;
Ok=true;
}
public Result(Exception exc)
{
Exception=exc;
Ok=false;
}
}
fetchApiResponse
可以是:
var fetchApiResponse = new TransformBlock<TA, Result<TA>>((item) => {
try
{
...
return new Result(ObjA,true);
}
catch(Exception exc)
{
return new Result(exc);
}
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
和 LinkTo
代码可以是:
var propagate=new DataflowLinkOptions { PropagateCompletion = true };
var nullBlock=DataflowBlock.NullTarget<Result<TA>>();
fetchApiResponse.Linkto(logger,propagage,msg=>!msg.Ok);
fetchApiResponse.LinkTo(finalBlock,propagate,msg=>msg.Ok);
在这种情况下,错误消息会简单地转储到空块中。
没有理由使用另一个缓冲块,或等待所有块。 TransformBlock 和 ActionBlock 都有一个由 ExecutionDataflowBlockOptions
选项控制的输入缓冲区。
发布消息并等待完成可以是:
await FetchData(fetchApiResponse);
fetchApiResponse.Complete();
await finalBlock.Completion;
finalBlock
中的空检查也可以删除,如果 fetchApiResponse
return 一个空的 Result
对象,如果没有有效结果。
更复杂的场景可以由更复杂的 Result
对象处理。
突然终止
即使管道需要立即终止,也不应该有任何未处理的异常。故障可能会向下游传播,但不会影响 upstream 块。他们会将消息保存在内存中并继续接受输入,即使管道的其余部分已损坏。
这看起来绝对像是一个僵局。
对此的解决方案是使用 CancellationTokenSource
,将其令牌传递给 所有 块,并在需要终止管道时向其发出信号。
这是常见的做法,例如在 Go 中,正是出于这个原因,使用像 CancellationTokenSource 这样的通道,并取消下游 和 上游块。这在 Go Concurrency Patterns: Pipelines and cancellation
中有描述
如果一个块决定没有理由继续工作,那么提前取消很有用,而不仅仅是在出现错误的情况下。在这种情况下,它可以发出 CancellationTokenSource 信号来停止上游块
我无法通过@Panagiotis Kanavos 的post。同时,我已经像这样更新了我的代码以根据评论处理异常。
public async Task PerformOperation()
{
try
{
var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1
});
var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
//Call an api to fetch result.
//Here for some data i get exception
try
{
int apiResult = await apiCall();
}
catch(Exception ex)
{
**var dataflowBlock = (IDataflowBlock)bufferBlock;
dataflowBlock.Fault(ex);
throw ex;**
}
return ObjA;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
var finalBlock = new ActionBlock<ObjA>((item) => {
if (item != null)
{
SaveToDB(item);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });
bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });
await FetchData(bufferBlock);
bufferBlock.Complete();
await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
}
catch(AggregateException aex)
{ //logging the exceptions in aex }
catch(Exception ex)
{ //logging the exception}
}
public async Task FetchData(bufferBlock)
{
List<ObjA> dataToProcessList = GetFromDB();
foreach (var item in dataToProcessList)
{
if(!await bufferBlock.SendAsync(item))
{
break; //breaking the loop to stop pushing data.
}
}
}
现在这将停止管道并且不会陷入僵局。由于我正在处理大量数据,我计划为异常添加一个计数器,如果它超过特定限制,那么我只会停止管道。如果一个小的网络故障导致一个 api 调用失败,它可能适用于下一个数据。
我将浏览新的 posts 并更新我的代码以使事情变得更好。
请提供意见。
谢谢
比尼尔
我使用 BufferBlock
、TransformBlock
和 ActionBlock
创建了一个 DataFlow 管道。由于 TransformBlock
中的异常,应用程序将死锁。我正在使用 BoundedCapacity
.
我的代码是这样的:
public async Task PerformOperation()
{
var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 });
var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
//Call an api to fetch result.
//Here for some data i get exception
return ObjA;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
var finalBlock = new ActionBlock<ObjA>((item) => {
if (item != null)
{
SaveToDB(item);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });
bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });
await FetchData(bufferBlock);
bufferBlock.Complete();
await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
}
public async Task FetchData(bufferBlock)
{
List<ObjA> dataToProcessList = GetFromDB();
foreach (var item in dataToProcessList)
{
await bufferBlock.SendAsync(item);
}
}
这里,如果 fetchApiResponse
块中出现异常,则数据不会移动,并且会进入死锁状态。
如何处理此管道中的异常?
这里大约有 200,000 条记录被推送到 bufferBlock。
在不导致此死锁的情况下处理异常的最佳方法是什么?
更新 1:
还添加了 FetchData
方法。
谢谢 比尼尔
与其试图弄清楚什么是错误的,哪些是错误的,块应该不允许未处理的异常。这是一个很常见的模式,也用于[Go pipelines]9https://blog.golang.org/pipelines)
文章 Exception Handling in TPL Dataflow Networks 解释了如何处理异常。
- 当抛出未处理的异常时,只有在所有并发操作完成后,块才会进入故障状态。
- 该状态传播到
PropagateCompletion
设置为true
的链接块。不过,这并不意味着下游块会立即出现故障。
等待错误块抛出。该行:
await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
应该抛出,除非那些块仍然很忙。
解决方案 - 不允许未处理的异常
Return 一个 Result
对象。例如,当必须进行 1000 次 HTTP 调用时,无论如何让一个异常阻止其他 900 次调用并不是一个好主意。这与 Railroad-oriented programming 大致相似。数据流管道与功能管道非常相似。
每个块应该 return 一个 Result<T>
class 包装实际结果并以某种方式指示成功或失败。异常处理块应该捕获任何异常和 return 错误的 Result<T>
项。 LinkTo
方法可以有一个谓词,允许将失败的结果重定向到例如日志记录块或 NullBlock。
假设我们有这个简单的 Result<T>
:
class Result<T>
{
public T Value{get;}
public Exception Exception{get;}
public bool Ok {get;}
public Result(){}
public Result(T value)
{
Value=value;
Ok=true;
}
public Result(Exception exc)
{
Exception=exc;
Ok=false;
}
}
fetchApiResponse
可以是:
var fetchApiResponse = new TransformBlock<TA, Result<TA>>((item) => {
try
{
...
return new Result(ObjA,true);
}
catch(Exception exc)
{
return new Result(exc);
}
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
和 LinkTo
代码可以是:
var propagate=new DataflowLinkOptions { PropagateCompletion = true };
var nullBlock=DataflowBlock.NullTarget<Result<TA>>();
fetchApiResponse.Linkto(logger,propagage,msg=>!msg.Ok);
fetchApiResponse.LinkTo(finalBlock,propagate,msg=>msg.Ok);
在这种情况下,错误消息会简单地转储到空块中。
没有理由使用另一个缓冲块,或等待所有块。 TransformBlock 和 ActionBlock 都有一个由 ExecutionDataflowBlockOptions
选项控制的输入缓冲区。
发布消息并等待完成可以是:
await FetchData(fetchApiResponse);
fetchApiResponse.Complete();
await finalBlock.Completion;
finalBlock
中的空检查也可以删除,如果 fetchApiResponse
return 一个空的 Result
对象,如果没有有效结果。
更复杂的场景可以由更复杂的 Result
对象处理。
突然终止
即使管道需要立即终止,也不应该有任何未处理的异常。故障可能会向下游传播,但不会影响 upstream 块。他们会将消息保存在内存中并继续接受输入,即使管道的其余部分已损坏。
这看起来绝对像是一个僵局。
对此的解决方案是使用 CancellationTokenSource
,将其令牌传递给 所有 块,并在需要终止管道时向其发出信号。
这是常见的做法,例如在 Go 中,正是出于这个原因,使用像 CancellationTokenSource 这样的通道,并取消下游 和 上游块。这在 Go Concurrency Patterns: Pipelines and cancellation
中有描述如果一个块决定没有理由继续工作,那么提前取消很有用,而不仅仅是在出现错误的情况下。在这种情况下,它可以发出 CancellationTokenSource 信号来停止上游块
我无法通过@Panagiotis Kanavos 的post。同时,我已经像这样更新了我的代码以根据评论处理异常。
public async Task PerformOperation()
{
try
{
var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1
});
var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
//Call an api to fetch result.
//Here for some data i get exception
try
{
int apiResult = await apiCall();
}
catch(Exception ex)
{
**var dataflowBlock = (IDataflowBlock)bufferBlock;
dataflowBlock.Fault(ex);
throw ex;**
}
return ObjA;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
var finalBlock = new ActionBlock<ObjA>((item) => {
if (item != null)
{
SaveToDB(item);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });
bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });
await FetchData(bufferBlock);
bufferBlock.Complete();
await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
}
catch(AggregateException aex)
{ //logging the exceptions in aex }
catch(Exception ex)
{ //logging the exception}
}
public async Task FetchData(bufferBlock)
{
List<ObjA> dataToProcessList = GetFromDB();
foreach (var item in dataToProcessList)
{
if(!await bufferBlock.SendAsync(item))
{
break; //breaking the loop to stop pushing data.
}
}
}
现在这将停止管道并且不会陷入僵局。由于我正在处理大量数据,我计划为异常添加一个计数器,如果它超过特定限制,那么我只会停止管道。如果一个小的网络故障导致一个 api 调用失败,它可能适用于下一个数据。
我将浏览新的 posts 并更新我的代码以使事情变得更好。 请提供意见。
谢谢 比尼尔