如何正确并行化工作任务?

How to properly parallelize worker tasks?

考虑以下代码片段,注意将 numberTasksToSpinOff 设置为 1 和 3,4 或更多(取决于您计算机上的线程资源)之间总 运行时间的差异。当分拆更多任务时,我注意到 运行 次更长。

我故意将数据集合传递给每个工作任务同时读取的每个工作实例。我认为只要这些操作只是读取或枚举,任务就可以访问共享数据结构而不会阻塞。

我的目标是分拆多个任务,这些任务通过读取操作迭代相同的共享数据结构,并在大约同一时间完成,而不管分拆的任务数量如何。

编辑:请参阅第二个代码片段,我在其中实现 Parallel.Foreach() 并创建每个工作人员自己的数据集,因此不会通过不同的 tasks/threads 访问相同的数据结构.然而,我仍然看到不可接受的开销。

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine($"Entry Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        //run
        var task = Task.Run(async () =>
        {
            Console.WriteLine($"Entry RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            await RunMe();

            Console.WriteLine($"Exit RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
        });

        task.Wait();

        Console.WriteLine($"Exit Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static async Task RunMe()
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 6;
        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var tasks = new List<Task>();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i, dataPoints));
        }

        //start timer
        watch.Restart();

        //spin off tasks
        foreach (var worker in workers)
        {
            tasks.Add(Task.Run(() =>
            {
                Console.WriteLine($"Entry WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
                worker.DoSomeWork();
                Console.WriteLine($"Exit WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            }));

        }

        //completion tasks
        await Task.WhenAll(tasks);

        //stop timer
        watch.Stop();

        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    private List<double> _data;

    public Worker(int workerId, List<double> data)
    {
        WorkerId = workerId;
        _data = data;
    }

    public void DoSomeWork()
    {
        var indexPos = 0;

        foreach (var dp in _data)
        {
            var subSet = _data.Skip(indexPos).Take(_data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

第二个代码片段:

class Program
{
    static void Main(string[] args)
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 1;
        var numberItems = 20000;
        //var random = new Random((int)DateTime.Now.Ticks);
        //var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i));
        }

        //start timer
        watch.Restart();

        //parellel work

        if (workers.Any())
        {
            var processorCount = Environment.ProcessorCount;
            var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = processorCount };
            Parallel.ForEach(workers, parallelOptions, DoSomeWork);
        }

        //stop timer
        watch.Stop();
        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static void DoSomeWork(Worker worker)
    {
        Console.WriteLine($"WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        var indexPos = 0;

        foreach (var dp in worker.Data)
        {
            var subSet = worker.Data.Skip(indexPos).Take(worker.Data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    public List<double> Data { get; set; }

    public Worker(int workerId)
    {
        WorkerId = workerId;

        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        Data = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();

    }
}

您看过并行任务吗?然后你可以做这样的事情。

例如:

if (workers.Any())
{
    var parallelOptions = new ParallelOptions {MaxDegreeOfParallelism = Environment.ProcessorCount};
    Parallel.ForEach(workers, parallelOptions, DoSomeWork);
}

private static void DoSomeWork(Worker worker)
{
}

注意:以下答案基于测试和观察,并非权威知识。

分拆的任务越多,产生的开销就越大,因此总执行时间也会增加。 但是 如果你从另一个角度考虑它,你会发现实际处理的 "data-points" 会增加你旋转的更多任务(直到达到可用硬件的限制 -线程):

以下值是在我的机器 (4C/8T) 上生成的,每个列表有 10000 个点:

  • 1 名工人 -> 1891 毫秒 -> 5288 p/s
  • 2 名工人 -> 1921 毫秒 -> 10411 p/s
  • 4 名工人 -> 2670 毫秒 -> 14981 p/s
  • 8 个工人 -> 4871 毫秒 -> 16423 p/s
  • 12 名工人 -> 7449 毫秒 -> 16109 p/s

在那里你看到,直到我达到我的 "core-limit" 处理的数据显着增加,然后直到我达到我的 "thread-limit" 它增加仍然很明显,但之后它再次减少,因为开销增加并且没有更多可用的硬件资源。