链接数据流块如何取消目标块

How does a linked dataflow block cancel the target block

在 TPL 数据流中,当一个块通过传播链接到另一个块时,它会转发异常和取消。我可以想象转发异常只需使用 dataFlowBlock.Fault(exception) 即可完成,但我很好奇取消是如何转发的,因为没有 dataFlowBlock.Cancel() 这样的东西。它是通过相同的 Fault() 方法使用传递 TaskCancelledException 作为参数完成的吗?

更新:

为了澄清,请考虑以下示例,其中只有 block1 是通过选项使用 CancelationToken 创建的,而 block2 不是。 Block1 通过传播链接到 block2:

block1 { CancellationToken = ct } -> block2 { }

当ct 收到取消请求时,block1 完成转换为取消。我的问题是此时 block2 会发生什么? block1 是否主动取消 block2,如果是,它是否使用 block.Fault(TaskCanceledException) 来取消?或者它是否使用了一些内部 ocus-pocus 神奇地取消了 block2,即使它是在没有取消令牌的情况下创建的?

取消以选项 CancellationToken 的形式出现,在 ExecutionDataflowBlockOptions The token passed will complete the blocks successfully. What that means is the blocks will treat an OperationCanceledException 上与管道中抛出的其他异常不同,并导致块在完成处理后简单地完成。取消旨在通过共享单个 CTS 并将关联的令牌用于块选项来工作。然后当取消CTS时,所有块都会得到取消信号。

还可以找到更多 here from MS

在评论中指出你的实际问题,这可能会解释更多。

When a dataflow block is canceled explicitly, the AggregateException object contains OperationCanceledException in the InnerExceptions property

还有:

The TPL provides a mechanism that enables tasks to coordinate cancellation in a cooperative manner. To enable dataflow blocks to participate in this cancellation mechanism, set the CancellationToken property. When this CancellationToken object is set to the canceled state, all dataflow blocks that monitor this token finish execution of their current item but do not start processing subsequent items. These dataflow blocks also clear any buffered messages, release connections to any source and target blocks, and transition to the canceled state. By transitioning to the canceled state, the Completion property has the Status property set to Canceled, unless an exception occurred during processing. In that case, Status is set to Faulted.

Source

这是很多引用的文字,但提供了来源 link 并且解释写得很好。

因此块不会主动传播取消。但是,一旦取消的块完成,将传播设置为 true,完成任务将与取消或故障状态一起流动。

好的,我认为我们与更新后的 post 在同一页面上。简而言之,如果传播不正确,取消不会流向链接块。

[TestFixture]
public class DataFlowTests
{

    [Test]
    public async Task DataflowTest()
    {
        var cts = new CancellationTokenSource();
        var buffer = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 200, CancellationToken = cts.Token });
        var action = new ActionBlock<int>(x => Task.Delay(100), new ExecutionDataflowBlockOptions() { BoundedCapacity = 5 });
        buffer.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = false});

        foreach (var data in Enumerable.Range(0, 20))
        {
            if (data > 10) break;
            await buffer.SendAsync(data);
        }
        cts.Cancel();
        //action.Complete();
        await action.Completion;
        Console.WriteLine(buffer.Completion.Status);
        Console.WriteLine(action.Completion.Status);
    }
}

该示例将永远挂起,等待 action 完成。现在在 ActionBlock<> 上调用 Complete() explicilty 会产生这些结果状态:

Cancelled - buffer
RanToCompletion - action

最终传播完成会产生相同的结果,而无需在下游块上手动调用完成:

[TestFixture]
public class DataFlowTests
{

    [Test]
    public async Task DataflowTest()
    {
        var cts = new CancellationTokenSource();
        var buffer = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 200, CancellationToken = cts.Token });
        var action = new ActionBlock<int>(x => Task.Delay(100), new ExecutionDataflowBlockOptions() { BoundedCapacity = 5 });
        buffer.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true});

        foreach (var data in Enumerable.Range(0, 20))
        {
            if (data > 10) break;
            await buffer.SendAsync(data);
        }
        cts.Cancel();
        await action.Completion;
        Console.WriteLine(buffer.Completion.Status);
        Console.WriteLine(action.Completion.Status);
    }
}

收益状态:

Canceled - buffer
RanToCompletion - action

请注意,取消 不会 导致整个管道被取消。其他块简单地完成。现在,如果除了取消通知之外的异常通过,那么管道将通过 ...Fault(..) 出现故障。否则它会突出标准完成传播。

可以找到每当块将其完成传播到链接块时运行的源代码here or here。下面是此代码的简化版本:

internal static void PropagateCompletion(Task sourceCompletionTask, IDataflowBlock target)
{
    AggregateException exception =
        sourceCompletionTask.IsFaulted ? sourceCompletionTask.Exception : null;

    if (exception != null) target.Fault(exception); else target.Complete();
}

没有传播取消的规定。如果块在故障状态下完成,异常将传播到链接块。在任何其他情况下,链接块仅被标记为完成。

据我所知,块转换为取消状态的唯一情况是在其创建时提供的 CancellationToken 被取消。例如,如果您尝试从其 lambda 函数内部 throw new OperationCanceledException(),什么也不会发生,处理的消息将被忽略。