TPL 数据流管道完成未从等待中返回

TPL Dataflow Pipeline completion not returning from wait

即使所有数据都已处理并显示在控制台上,我的管道仍未注册为完整。我将其设置为等待完成,但它永远不会完成并且不允许该方法 return。

    TransformBlock<string, CompanyInfo> GetCompanyInfo;
    TransformBlock<string, List<Dividend>> GetDividendReports;
    TransformBlock<string, KeyStats> GetKeyStatInfo;
    TransformBlock<string, List<Interval>> GetIntervalReports;
    TransformBlock<List<Interval>, List<decimal>> GetChangesOverInterval;
    BroadcastBlock<string> broadcastSymbol;
    TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string> GenerateXmlString;
    ActionBlock<string> GenerateCompleteReport;
    CancellationTokenSource cancellationTokenSource;


    public Task StartPipeline()
    {
        cancellationTokenSource = new CancellationTokenSource();

        ExecutionDataflowBlockOptions executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationTokenSource.Token,
            MaxDegreeOfParallelism = MAXPARA
        };

        broadcastSymbol = new BroadcastBlock<string>(symbol => symbol);
        var joinblock = new JoinBlock<List<decimal>, List<Dividend>, KeyStats>(new GroupingDataflowBlockOptions { Greedy = false });

        GetCompanyInfo = new TransformBlock<string, CompanyInfo>(symbol =>
        {
            return RetrieveCompanyInfo(symbol);
        }, executionDataflowBlockOptions);

        GetDividendReports = new TransformBlock<string, List<Dividend>>(symbol =>
        {
            return RetrieveDividendInfo(symbol);
        }, executionDataflowBlockOptions);

        GetKeyStatInfo = new TransformBlock<string, KeyStats>(symbol =>
        {
            return RetrieveKeyStats(symbol);
        }, executionDataflowBlockOptions);

        GetIntervalReports = new TransformBlock<string, List<Interval>>(symbol =>
        {
            return RetrieveIntervals(symbol, 30);
        }, executionDataflowBlockOptions);

        GetChangesOverInterval = new TransformBlock<List<Interval>, List<decimal>>(intervals =>
        {
            return ConstructIntervalReport(intervals);
        }, executionDataflowBlockOptions);

        GenerateXmlString = new TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string>(tup =>
        {
            var ReportObj = new Report
            {
                changeIntervals = tup.Item1,
                dividends = tup.Item2,
                keyStats = tup.Item3
            };

            XmlSerializer ser = new XmlSerializer(typeof(Report));
            var stringWriter = new StringWriter();
            ser.Serialize(stringWriter, ReportObj);
            return stringWriter.ToString();

        }, executionDataflowBlockOptions);

        GenerateCompleteReport = new ActionBlock<string>(xml =>
        {
            var str = Path.GetRandomFileName().Replace(".", "") + ".xml";
            File.WriteAllText(str, xml);
            Console.WriteLine("Finished File");
        }, executionDataflowBlockOptions);

        var options = new DataflowLinkOptions { PropagateCompletion = true };

        var buffer = new BufferBlock<string>();
        buffer.LinkTo(broadcastSymbol);

        //Broadcasts the symbol
        broadcastSymbol.LinkTo(GetIntervalReports, options);
        broadcastSymbol.LinkTo(GetDividendReports, options);
        broadcastSymbol.LinkTo(GetKeyStatInfo, options);
        //Second teir parallel 
        GetIntervalReports.LinkTo(GetChangesOverInterval, options);
        //Joins the parallel blocks back together
        GetDividendReports.LinkTo(joinblock.Target2, options);
        GetKeyStatInfo.LinkTo(joinblock.Target3, options);
        GetChangesOverInterval.LinkTo(joinblock.Target1, options);

        joinblock.LinkTo(GenerateXmlString, options);
        GenerateXmlString.LinkTo(GenerateCompleteReport, options);

        buffer.Post("F");
        buffer.Post("AGFS");
        buffer.Post("BAC");
        buffer.Post("FCF");

        buffer.Complete();

        GenerateCompleteReport.Completion.Wait(cancellationTokenSource.Token);

    }

我不确定为什么它没有从管道中 return 发出异常或完成。当程序运行时,它会显示正在创建和停止的所有文件,但在等待完成后没有代码执行。 PropagateCompletion 难道不应该让块知道它们何时完成它们的动作或转换吗?

您没有将 link 选项传递给您的 BufferBlock,因此不会传播完成。另一方面,只有一个 linked 块将从您的 BroadcastBlock 接收完成。如果你想等待所有三个 linked 块,你必须自己明确地处理它。

此外,由于该方法已经 return 为 Task,因此无需 return 为 Task.CompletedTask,您可以简单地使用 async/sawait 而不是阻塞 .Wait()。您希望 await 调用此方法与 null 有什么关系?

if (GenerateCompleteReport.Completion.IsCompletedSuccessfully) { return Task.CompletedTask;  }

        return null;

相反,您可以:

await enerateCompleteReport.Completion

问题最终没有将传播链接到缓冲区块,这不允许任何块接收完成事件。此外,我需要让来自广播的所有接收块都接收到完整状态,因为我不知道广播块只会将完整事件发送到其中一个链接块。

添加这些更改后,管道按我预期的方式工作。

    readonly string _baseUrl = "https://api.iextrading.com/1.0/";
    const int MAXPARA = 2;

    TransformBlock<string, CompanyInfo> GetCompanyInfo;
    TransformBlock<string, List<Dividend>> GetDividendReports;
    TransformBlock<string, KeyStats> GetKeyStatInfo;
    TransformBlock<string, List<Interval>> GetIntervalReports;
    TransformBlock<List<Interval>, List<decimal>> GetChangesOverInterval;
    BroadcastBlock<string> broadcastSymbol;
    TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string> GenerateXmlString;
    ActionBlock<string> GenerateCompleteReport;
    CancellationTokenSource cancellationTokenSource;

    public void StartPipeline()
    {

        //Add cancelation to the pipeline
        cancellationTokenSource = new CancellationTokenSource();

        ExecutionDataflowBlockOptions executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationTokenSource.Token,
            MaxDegreeOfParallelism = MAXPARA
        };

        broadcastSymbol = new BroadcastBlock<string>(symbol => symbol);
        var joinblock = new JoinBlock<List<decimal>, List<Dividend>, KeyStats>(new GroupingDataflowBlockOptions { Greedy = false });

        GetCompanyInfo = new TransformBlock<string, CompanyInfo>(symbol =>
        {
            return RetrieveCompanyInfo(symbol);
        }, executionDataflowBlockOptions);

        GetDividendReports = new TransformBlock<string, List<Dividend>>(symbol =>
        {
            return RetrieveDividendInfo(symbol);
        }, executionDataflowBlockOptions);

        GetKeyStatInfo = new TransformBlock<string, KeyStats>(symbol =>
        {
            return RetrieveKeyStats(symbol);
        }, executionDataflowBlockOptions);

        GetIntervalReports = new TransformBlock<string, List<Interval>>(symbol =>
        {
            return RetrieveIntervals(symbol, 30);
        }, executionDataflowBlockOptions);

        GetChangesOverInterval = new TransformBlock<List<Interval>, List<decimal>>(intervals =>
        {
            return ConstructIntervalReport(intervals);
        }, executionDataflowBlockOptions);

        GenerateXmlString = new TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string>(tup =>
        {
            var ReportObj = new Report
            {
                changeIntervals = tup.Item1,
                dividends = tup.Item2,
                keyStats = tup.Item3
            };

            XmlSerializer ser = new XmlSerializer(typeof(Report));
            var stringWriter = new StringWriter();
            ser.Serialize(stringWriter, ReportObj);
            return stringWriter.ToString();

        }, executionDataflowBlockOptions);

        GenerateCompleteReport = new ActionBlock<string>(xml =>
        {
            var str = Path.GetRandomFileName().Replace(".", "") + ".xml";
            File.WriteAllText(str, xml);
            Console.WriteLine("Finished File");
        }, executionDataflowBlockOptions);

        var options = new DataflowLinkOptions { PropagateCompletion = true };

        var buffer = new BufferBlock<string>();
        buffer.LinkTo(broadcastSymbol, options);

        //Need to make sure all data is recieved for each linked block
        //Broadcast block only sends completion notice to one of the linked blocks
        broadcastSymbol.Completion.ContinueWith(tsk =>
        {
            if(!tsk.IsFaulted)
            {
                GetIntervalReports.Complete();
                GetDividendReports.Complete();
                GetKeyStatInfo.Complete();
            }
            else
            {
                ((IDataflowBlock)GetIntervalReports).Fault(tsk.Exception);
                ((IDataflowBlock)GetDividendReports).Fault(tsk.Exception);
                ((IDataflowBlock)GetKeyStatInfo).Fault(tsk.Exception);
            }
        });


        //Broadcasts the symbol
        broadcastSymbol.LinkTo(GetIntervalReports, options);
        broadcastSymbol.LinkTo(GetDividendReports, options);
        broadcastSymbol.LinkTo(GetKeyStatInfo, options);
        //Second teir parallel 
        GetIntervalReports.LinkTo(GetChangesOverInterval, options);
        //Joins the parallel blocks back together
        GetDividendReports.LinkTo(joinblock.Target2, options);
        GetKeyStatInfo.LinkTo(joinblock.Target3, options);
        GetChangesOverInterval.LinkTo(joinblock.Target1, options);

        joinblock.LinkTo(GenerateXmlString, options);
        GenerateXmlString.LinkTo(GenerateCompleteReport, options);

        buffer.Post("F");
        buffer.Post("AGFS");
        buffer.Post("BAC");
        buffer.Post("FCF");


        buffer.Complete();

        GenerateCompleteReport.Completion.Wait(cancellationTokenSource.Token);

    }