在 TPL 数据流管道中导致死锁的异常

Exception causing deadlock in TPL Dataflow pipeline

我使用 BufferBlockTransformBlockActionBlock 创建了一个 DataFlow 管道。由于 TransformBlock 中的异常,应用程序将死锁。我正在使用 BoundedCapacity.

限制数据

我的代码是这样的:

public async Task PerformOperation()
{
    var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 });
    var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
        //Call an api to fetch result.
        //Here for some data i get exception
        return ObjA;
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
    var finalBlock = new ActionBlock<ObjA>((item) => {
        if (item != null)
        {
            SaveToDB(item);
        }
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });

    bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
    fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await FetchData(bufferBlock);
    bufferBlock.Complete();
    await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
}
public async Task FetchData(bufferBlock)
{
    List<ObjA> dataToProcessList = GetFromDB();
    foreach (var item in dataToProcessList)
    {
        await bufferBlock.SendAsync(item);
    }
}

这里,如果 fetchApiResponse 块中出现异常,则数据不会移动,并且会进入死锁状态。 如何处理此管道中的异常?
这里大约有 200,000 条记录被推送到 bufferBlock。
在不导致此死锁的情况下处理异常的最佳方法是什么?

更新 1: 还添加了 FetchData 方法。

谢谢 比尼尔

与其试图弄清楚什么是错误的,哪些是错误的,块应该允许未处理的异常。这是一个很常见的模式,也用于[Go pipelines]9https://blog.golang.org/pipelines)

文章 Exception Handling in TPL Dataflow Networks 解释了如何处理异常。

  • 当抛出未处理的异常时,只有在所有并发操作完成后,块才会进入故障状态。
  • 该状态传播到 PropagateCompletion 设置为 true 的链接块。不过,这并不意味着下游块会立即出现故障。

等待错误块抛出。该行:

await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);

应该抛出,除非那些块仍然很忙。

解决方案 - 不允许未处理的异常

Return 一个 Result 对象。例如,当必须进行 1000 次 HTTP 调用时,无论如何让一个异常阻止其他 900 次调用并不是一个好主意。这与 Railroad-oriented programming 大致相似。数据流管道与功能管道非常相似。

每个块应该 return 一个 Result<T> class 包装实际结果并以某种方式指示成功或失败。异常处理块应该捕获任何异常和 return 错误的 Result<T> 项。 LinkTo 方法可以有一个谓词,允许将失败的结果重定向到例如日志记录块或 NullBlock。

假设我们有这个简单的 Result<T> :

class Result<T>
{
    public T Value{get;}
    public Exception Exception{get;}
    public bool Ok {get;}

    public Result(){}

    public Result(T value)
    {
        Value=value;
        Ok=true;
    }

    public Result(Exception exc)
    {
        Exception=exc;
        Ok=false;
    }
}

fetchApiResponse 可以是:

    var fetchApiResponse = new TransformBlock<TA, Result<TA>>((item) => {
        try
        {
            ...
            return new Result(ObjA,true);
        }
        catch(Exception exc)
        {
            return new Result(exc);
        }
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });

LinkTo 代码可以是:

var propagate=new DataflowLinkOptions { PropagateCompletion = true };

var nullBlock=DataflowBlock.NullTarget<Result<TA>>();
fetchApiResponse.Linkto(logger,propagage,msg=>!msg.Ok);
fetchApiResponse.LinkTo(finalBlock,propagate,msg=>msg.Ok);

在这种情况下,错误消息会简单地转储到空块中。

没有理由使用另一个缓冲块,或等待所有块。 TransformBlock 和 ActionBlock 都有一个由 ExecutionDataflowBlockOptions 选项控制的输入缓冲区。

发布消息并等待完成可以是:

await FetchData(fetchApiResponse);
fetchApiResponse.Complete();
await finalBlock.Completion;

finalBlock 中的空检查也可以删除,如果 fetchApiResponse return 一个空的 Result 对象,如果没有有效结果。

更复杂的场景可以由更复杂的 Result 对象处理。

突然终止

即使管道需要立即终止,也不应该有任何未处理的异常。故障可能会向下游传播,但不会影响 upstream 块。他们会将消息保存在内存中并继续接受输入,即使管道的其余部分已损坏。

这看起来绝对像是一个僵局。

对此的解决方案是使用 CancellationTokenSource,将其令牌传递给 所有 块,并在需要终止管道时向其发出信号。

这是常见的做法,例如在 Go 中,正是出于这个原因,使用像 CancellationTokenSource 这样的通道,并取消下游 上游块。这在 Go Concurrency Patterns: Pipelines and cancellation

中有描述

如果一个块决定没有理由继续工作,那么提前取消很有用,而不仅仅是在出现错误的情况下。在这种情况下,它可以发出 CancellationTokenSource 信号来停止上游块

我无法通过@Panagiotis Kanavos 的post。同时,我已经像这样更新了我的代码以根据评论处理异常。

public async Task PerformOperation()
{
  try
   {
    var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 
    });
    var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
        //Call an api to fetch result.
        //Here for some data i get exception
        try
        {
          int apiResult = await apiCall();
        }
        catch(Exception ex)
        {
         **var dataflowBlock = (IDataflowBlock)bufferBlock;
          dataflowBlock.Fault(ex);
          throw ex;**
        }
        return ObjA;
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
    var finalBlock = new ActionBlock<ObjA>((item) => {
        if (item != null)
        {
            SaveToDB(item);
        }
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });

    bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
    fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await FetchData(bufferBlock);
    bufferBlock.Complete();
    await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
  }
  catch(AggregateException aex)
  {   //logging the exceptions in aex  }
  catch(Exception ex)
  { //logging the exception}
}
public async Task FetchData(bufferBlock)
{
    List<ObjA> dataToProcessList = GetFromDB();
    foreach (var item in dataToProcessList)
    {
        if(!await bufferBlock.SendAsync(item))
        {
          break; //breaking the loop to stop pushing data.
        }
    }
}

现在这将停止管道并且不会陷入僵局。由于我正在处理大量数据,我计划为异常添加一个计数器,如果它超过特定限制,那么我只会停止管道。如果一个小的网络故障导致一个 api 调用失败,它可能适用于下一个数据。

我将浏览新的 posts 并更新我的代码以使事情变得更好。 请提供意见。

谢谢 比尼尔