在 Windows 服务中停止 Parallel.ForEach 低于正常优先级

Stopping Parallel.ForEach in Windows Service with below normal priority

我的 Windows 服务中有一个 Parallel.ForEach 代码。如果 ParallelOptions.MaxDegreeOfParallelism 设置为 -1,我将使用我的大部分 CPU。但是停止服务会持续半分钟。一些应该接收到应该停止服务的信号的内部控制器线程因处理器时间不足而不足。我将进程优先级设置为低于正常值,但这可能与此处无关。

如何缩短所有线程都忙时停止服务的时间?

我想暂时降低线程池中线程的优先级,因为我没有任何异步代码,但网上说这是个坏主意,所以在这里问 "proper" 方式。

OnStartOnStop 之间的线程(OS 和 .NET)在所有情况下都是不同的。此外,如果停止时间非常长,那么 OnStop 有时最终会被调用的 OS 线程是一个新线程,不会在日志中显示得更早。

要构建此代码,请创建新的 Windows 服务项目,从设计器中添加 ProjectInstaller class,将帐户更改为 LocalService,然后使用 InstallUtil 安装一次。确保 LocalService 可以写入 C:\Temp.

public partial class Service1 : ServiceBase
{
    private ManualResetEvent stopEvent = new ManualResetEvent(false);
    private Task mainTask;
    private StreamWriter writer = File.AppendText(@"C:\Temp\Log.txt");

    public Service1()
    {
        InitializeComponent();

        writer.AutoFlush = true;
    }

    protected override void OnStart(string[] args)
    {
        Log("--------------");
        Log("OnStart");

        mainTask = Task.Run(new Action(Run));
    }

    protected override void OnStop()
    {
        Log("OnStop");
        stopEvent.Set();

        mainTask.Wait();
        Log("--------------");
    }

    private void Log(string line)
    {
        writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
            DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
    }

    private void Run()
    {
        try
        {
            using (var sha = SHA256.Create())
            {
                var parallelOptions = new ParallelOptions();
                parallelOptions.MaxDegreeOfParallelism = -1;

                Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),
                    parallelOptions, (fileName, parallelLoopState) =>
                {
                    if (stopEvent.WaitOne(0))
                    {
                        Log("Stop requested");
                        parallelLoopState.Stop();
                        return;
                    }

                    try
                    {
                        var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
                        Log(String.Format("file={0}, sillyhash={1}", fileName, Convert.ToBase64String(hash)));
                    }
                    catch (Exception ex)
                    {
                        Log(String.Format("file={0}, exception={1}", fileName, ex.Message));
                    }
                });
            }
        }
        catch (Exception ex)
        {
            Log(String.Format("exception={0}", ex.Message));
        }
    }
}

在 Parallel.Foreach 中,您读取文件的所有字节,然后使用 LINQ 对它们进行排序。这效率不高。尝试使用 Array.Sort。对于 25 Mb 的文件,这可以快 85%。

Array.Sort 2230 ms
OrderBy 14650 ms

并且因为 OnStop 方法等待任何已经开始的迭代结束,它可以更快地停止您的服务。

var fileBinary = File.ReadAllBytes(fileName);
Array.Sort(fileBinary);
var hash = sha.ComputeHash(fileBinary);

在您的 parallelOptions 对象中有一个 属性 作为取消令牌源。您可以将 属性 设置为新的 CancellationTokenSource。然后在您的并行循环中,您将调用 parallelOptions.CancellationToken.ThrowIfCancellationRequested()。这会导致您的线程被终止。

详细示例请看: How to: Cancel a Parallel.For or ForEach Loop

编辑:如果您希望您的服务停止得更快,那么您可能还需要取消 ComputeHash 函数的执行。一旦您的线程处于该调用中,就无法取消。所以解决方案是在循环中使用 TransformBlock 方法进行块转换。在该循环期间,您需要检查您的 CancellationToken,或者在您的情况下检查手动重置事件对象。如果您需要有关如何操作的指导,请查看此答案:。他们出色地展示了如何使用 MD5 算法进行块转换,但它可以直接移植到 SHA256 算法。

问题是当您发出停止命令时,您的线程每个 需要查看停止命令然后同步才能真正停止。这意味着您的止损将仅与您的 最慢 哈希计算一样快。如果我是你,我会做的是重写代码的哈希计算部分,以便它迭代计算哈希而不是调用内置函数。这样,您可以在哈希计算的中间停止。

SHA256 中有称为 TransformBlockTransformFinalBlock 的方法来执行此操作。

我为自己的个人项目编写的一些代码示例是:

do
{
    if(SHOULD_STOP)
        STOP();
    oldBytesRead = bytesRead;
    oldBuffer = buffer;

    buffer = new byte[4096];
    bytesRead = stream.Read(buffer, 0, buffer.Length);

    totalBytesRead += bytesRead;

    if (bytesRead == 0)
    {
        hashAlgorithm.TransformFinalBlock(oldBuffer, 0, oldBytesRead);
    }
    else
    {
        hashAlgorithm.TransformBlock(oldBuffer, 0, oldBytesRead, oldBuffer, 0);
    }

    int progress = (int)((double)totalBytesRead * 100 / size);

} while (bytesRead != 0);

return BitConverter.ToString(hashAlgorithm.Hash).Replace("-", "").ToLowerInvariant();

此代码将在一两秒内停止服务,而已经在计算的线程只有在完成实际工作后才会结束。正如您在服务中看到的那样,OnStop 方法立即接收到信号。但是,TaskManager 显示与服务关联的进程只有在消费线程全部完成后才会停止。

这使用一个单独的线程正在填充的字符串(路径)的 BlockingCollection。并且有一些低优先级的线程会消耗字符串。

public partial class Service1 : ServiceBase
{
    private StreamWriter writer = File.AppendText(@"C:\temp\Log.txt");

    const int nbTreads = 30;
    BlockingCollection<string> dataItems;
    bool stopCompute = false;
    List<Thread> threads = new List<Thread>();
    Thread threadProd;
    private object aLock = new object();

    public Service1()
    {
        InitializeComponent();

        dataItems = new BlockingCollection<string>(nbTreads);

        writer.AutoFlush = true;
    }


    protected override void OnStart(string[] args)
    {
        Log("--------------");
        Log("OnStart");
        threadProd = new Thread(new ThreadStart(ProduireNomFichier));
        threadProd.Start();
        Thread.Sleep(1000); // fill the collection a little
        for (int i = 0; i < nbTreads; i++)
        {
            Thread threadRun = new Thread(() => Run());
            threadRun.Priority = ThreadPriority.Lowest;
            threadRun.Start();
            threads.Add(threadRun);
        }
    }

    private void ProduireNomFichier()
    {
        foreach (string nomFichier in Directory.EnumerateFiles(Environment.SystemDirectory))
        {
            dataItems.Add(nomFichier);
        }
    }

    protected override void OnStop()
    {
        lock (aLock)
        {
            stopCompute = true;
        }
        Log("OnStop");
        Log("--------------");
        threadProd.Abort();
    }

    private void Log(string line)
    {
        writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
            DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
    }

    private void Run()
    {
        try
        {
            using (var sha = SHA256.Create())
            {
                while (dataItems.TryTake(out string fileName))
                {
                    lock (aLock)
                    {
                        if (stopCompute) return;
                    }
                    try
                    {
                        var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
                        Log(String.Format("file={0}, sillyhash={1}", fileName, Convert.ToBase64String(hash)));
                    }
                    catch (Exception ex)
                    {
                        Log(String.Format("file={0}, exception={1}", fileName, ex.Message));
                    }
                }
            }
        }
        catch (Exception ex)
        {
            Log(String.Format("exception={0}", ex.Message));
        }
    }
}

这是一个工作代码。它立即停止。请注意,主要思想来自:SylF.

但我无法给出明确的解释为什么会发生... 更新(在您发表评论后):您找到了原因,它很好地解释了为什么您有这种行为。谢谢!我真的很高兴知道。

虽然这项工作是在低优先级线程中完成的,但您应该不会注意到 CPU 大部分没有工作的机器上有任何额外的延迟。

抱歉,我混淆了您的代码示例以实现一些测试。但主要思想是改变调度器(这似乎不推荐)。但这是我找到的唯一方法。

代码:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace WhosebugQuestionWindowsService1
{
    public partial class Service1 : ServiceBase
    {
        private ManualResetEvent stopEvent = new ManualResetEvent(false);
        private Task mainTask;
        private StreamWriter writer = File.CreateText(@"C:\Temp\Log.txt");     //TAKE CARE - I do not append anymore  ********
        private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        private int count = 0;

        public Service1()
        {
            InitializeComponent();

            writer.AutoFlush = true;
        }

        protected override void OnStart(string[] args)
        {
            Log("--------------");
            Log("OnStart");

            Task.Run(()=>Run());
        }

        protected override void OnStop()
        {
            Log("OnStop with actual thread count: " + Process.GetCurrentProcess().Threads.Count);

            cancellationTokenSource.Cancel();
        }

        private void Log(string line)
        {
            writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
                DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
        }

        private void Run()
        {
            Stopwatch stopWatchTotal = new Stopwatch();
            stopWatchTotal.Start();

            try
            {
                using (var sha = SHA256.Create())
                {
                    var parallelOptions = new ParallelOptions();
                    parallelOptions.MaxDegreeOfParallelism = -1;
                    parallelOptions.CancellationToken = cancellationTokenSource.Token;
                    parallelOptions.TaskScheduler = new PriorityScheduler(ThreadPriority.Lowest);

                    Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),
                        parallelOptions, (fileName, parallelLoopState) =>
                        {
                            // Thread.CurrentThread.Priority = ThreadPriority.Lowest;
                            Stopwatch stopWatch = new Stopwatch();
                            stopWatch.Start();

                            Interlocked.Increment(ref count);

                            if (parallelOptions.CancellationToken.IsCancellationRequested)
                            {
                                Log(String.Format($"{count}"));
                                return;
                            }

                            try
                            {
                                var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
                                stopWatch.Stop();
                                Log(FormatTicks(stopWatch.ElapsedTicks));
                                Log(String.Format($"{count}, {FormatTicks(stopWatch.ElapsedTicks)}, file={fileName}, sillyhash={Convert.ToBase64String(hash)}"));
                            }
                            catch (Exception ex)
                            {
                                Log(String.Format($"{count} file={fileName}, exception={ex.Message}"));
                            }
                        });
                }
            }
            catch (Exception ex)
            {
                Log(String.Format("exception={0}", ex.Message));
            }

            stopWatchTotal.Stop();

            Log(FormatTicks(stopWatchTotal.ElapsedTicks));

            writer.Close();
            Process.GetCurrentProcess().Kill();
        }

        private string FormatTicks(long ticks)
        {
            return new TimeSpan(ticks).ToString();
        }
    }
}

优先级调度程序:(感谢 Roman Starkov 在:Whosebug which came from Bnaya Eshet at Microsoft

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace WhosebugQuestionWindowsService1
{
    public class PriorityScheduler : TaskScheduler
    {
        public static PriorityScheduler AboveNormal = new PriorityScheduler(ThreadPriority.AboveNormal);
        public static PriorityScheduler BelowNormal = new PriorityScheduler(ThreadPriority.BelowNormal);
        public static PriorityScheduler Lowest = new PriorityScheduler(ThreadPriority.Lowest);

        private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
        private Thread[] _threads;
        private ThreadPriority _priority;
        private readonly int _maximumConcurrencyLevel = Math.Max(1, Environment.ProcessorCount);

        public PriorityScheduler(ThreadPriority priority)
        {
            _priority = priority;
        }

        public override int MaximumConcurrencyLevel
        {
            get { return _maximumConcurrencyLevel; }
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks;
        }

        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);

            if (_threads == null)
            {
                _threads = new Thread[_maximumConcurrencyLevel];
                for (int i = 0; i < _threads.Length; i++)
                {
                    int local = i;
                    _threads[i] = new Thread(() =>
                    {
                        foreach (Task t in _tasks.GetConsumingEnumerable())
                            base.TryExecuteTask(t);
                    });
                    _threads[i].Name = string.Format("PriorityScheduler: ", i);
                    _threads[i].Priority = _priority;
                    _threads[i].IsBackground = true;
                    _threads[i].Start();
                }
            }
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false; // we might not want to execute task that should schedule as high or low priority inline
        }
    }
}