TPL C# Windows 服务 - 有什么问题吗?

TPL C# Windows Service - anything wrong?

您好使用 TPL 编写 Windows 服务,要求是 Windows 服务应每隔几分钟或几秒 运行 并并行执行 4 个任务。我已经实现了以下代码,但有疑问

Task[] taskRunners = new Task[Utility.conTaskRunners];

        Log("Stating Initiate Tasks::" + DateTime.Now);

        Log("Calling InitiateTaskRunner Tasks::" + DateTime.Now);

        taskRunners[0] = Task.Factory.StartNew(() => InitiateTaskRunner());
        taskRunners[1] = Task.Factory.StartNew(() => ScanTaskRunner());
        taskRunners[2] = Task.Factory.StartNew(() => ConvertTaskRunner());
        taskRunners[3] = Task.Factory.StartNew(() => ConvertedScanRunner());
        Task.WaitAll(taskRunners);
        Thread.Sleep(12000);

        InitiateTasks();

InitiateTaskRunner 如下所示

 public void InitiateTaskRunner()
    {
        Log("Begening InitiateTaskRunner ::" + DateTime.Now);           
        string[] subDirs = Directory.GetDirectories(Utility.inputPath);
        if (subDirs.Length > 0) //files eixsts go and process it
        {
            Parallel.ForEach(subDirs, dir =>
            {
                InitiateTaskWorker(dir);
            });
        }           
        //BEFORE I SET signal SHOULD this wait as above is PARALLEL.FOREACH?
        _waitOnOne.Set(); //set signal for successor after finish
        Log("Ending InitiateTaskRunner ::" + DateTime.Now);
    }

其他 3 个 TaskRunner 也将是;基本上每个任务 运行ner 都会为每个子目录调用 TaskWorker 并异步处理文件,并 eatch TASK Runner 和每个 Task Worker 将是独立和并发的。在 TaskWorker 完成其对文件的处理后,文件从一个 taskWorker 到另一个 Task worker 的位置将被移动,下一个 taskworker 将拾取但没有触发从一个 task worker 到另一个 task worker。

这种方法的问题是,有时第 2、3 和 4 个任务 运行 人可能不做任何事情就退出,而第一个任务 运行 人仍在处理并推送到任务 运行ner#2 位置等等。来自任务 运行ners 位置 2、3、4 的文件将在下一次迭代中处理。

有更好的设计吗?任何对 TPL 中异常处理的帮助都会很棒。

public void ScanTaskRunner()
    {
        Log("Begening ScanTaskRunner ::" + DateTime.Now);            
        string[] subDirs = Directory.GetDirectories(Utility.scanPath);
        if (subDirs.Length > 0)  //files exists so proceed
        {
            Parallel.ForEach(subDirs, dir =>
            {
                ScanTaskWorker(dir, false);
            });
        }
        else //no files hence wait for prcedent TASK to complete
        {
            _waitOnOne.WaitOne();
            Parallel.ForEach(Directory.GetDirectories(Utility.scanPath), dir =>
            {
                ScanTaskWorker(dir, false);
            });
        }
        _waitOnTwo.Set(); //Go successor if waiting for this
        Log("Ending ScanTaskRunner ::" + DateTime.Now);
    }
public void ConvertTaskRunner()
    {
       Log("Begening ConvertTasktRunner ::" + DateTime.Now);
        string[] subDirs = Directory.GetDirectories(Utility.convertPath);
        if (subDirs.Length > 0)  //files exists so process as usual
        {
            Parallel.ForEach(subDirs, dir =>
            {
                ConvertTaskWorker(dir);
            });
        }
        else  //no files hence wait for precedent TASK to complete
        {
            _waitOnTwo.WaitOne();
            Parallel.ForEach(Directory.GetDirectories(Utility.scanPath), dir =>
            {
                ScanTaskWorker(dir, false);
            });
        }
        _waitOnThree.Set();  //Successor is good to go if waiting
        Log("Ending ConvertTasktRunner ::" + DateTime.Now);
    }
 public void ConvertedScanRunner()
    {
        Log("Begening ConvertedScanRunner ::" + DateTime.Now);
        string[] subDirs = Directory.GetDirectories(Utility.convrtedScanPath);
        if (subDirs.Length > 0)  //files avaiable process as usual
        {
            Parallel.ForEach(subDirs, dir =>
            {
                ConvertedScanTaskWorker(dir);
            });
        }
        else  //no files wait until precendent TASK completes
        {
            _waitOnThree.WaitOne();
            Parallel.ForEach(Directory.GetDirectories(Utility.scanPath), dir =>
            {
                ScanTaskWorker(dir, false);
            });
        }
        Log("Ending ConvertedScanRunner ::" + DateTime.Now);
    }

请参阅 Best way to do a task looping in Windows Service 了解启动任务并让它们继续进行的好方法。

就异常处理等而言,您可能希望等待您拥有的所有任务 运行 并捕获并处理 AggregateException。任何抛出异常的任务都将 IsFaulted 为真。在每个任务中使用 ThrowIfCancellationRequested 与其他人合作并参与被取消。那些合作的任务将 IsCanceled 属性 为真。

    public void Run(CancellationTokenSource cts)
    {
        _cts = cts;
        Task[] tasks = new Task[4];

        try
        {
            tasks[0] = Task.Factory.StartNew(() => PhaseOne(), cts.Token);
            tasks[1] = Task.Factory.StartNew(() => PhaseTwo(), cts.Token);
            tasks[2] = Task.Factory.StartNew(() => PhaseThree(), cts.Token);
            tasks[3] = Task.Factory.StartNew(() => PhaseFour(), cts.Token);

            Task.WaitAll(tasks);
        }
        catch (AggregateException ex)
        {
            // Handle exceptions
            foreach(Exception inner in ex.InnerExceptions)
            {
                // do something
            }
        }

        if (tasks.Any(t => t.IsFaulted))
        {
            // do something
        }
        if (tasks.Any(t => t.IsCanceled))
        {
            // do something
        }
    }

至于保持任务同步并了解何时处理文件或何时完成另一个阶段的问题,您可以使用 EventWaitHandle(尽管有多种方法可以做到这一点) .等待句柄可能比其他一些替代方法(如自旋锁)慢一点,因为它进入内核,但对于等待文件处理,使用它可能是个好主意。

此处的代码显示了 PhaseTwo 的外观。 PhaseOne 看起来本质上是一样的,但是您会为一个不同的处理委托等使用等待句柄。

    private EventWaitHandle _waitOnTwo = new EventWaitHandle(false, EventResetMode.AutoReset);
    public void PhaseTwo()
    {
        try
        {
            string phaseTwoDir = "C:\PhaseTwo";
            string phaseThreeDir = "C:\PhaseThree";

            Action<string> processFiles = new Action<string>((file) => { File.Move(file, Path.Combine(phaseThreeDir, Path.GetFileName(file))); _waitOnTwo.Set(); });
            Func<bool> continueCondition = new Func<bool>(() => { return (Directory.GetFiles(phaseTwoDir).Count() > 0 || _waitOnOne.WaitOne(60000)); });

            // process files
            Process(phaseTwoDir, processFiles, continueCondition);
        }
        finally
        {
            _waitOnTwo.Set();
        }
    }

    private void Process(string directory, Action<string> processFileWork, Func<bool> continueCondition)
    {
        while (continueCondition())
        {
            foreach (string file in System.IO.Directory.GetFiles(directory))
            {
                // if canceled, throw
                _cts.Token.ThrowIfCancellationRequested();

                // process file
                processFileWork(file);
            }

        }
    }