如何正确并行化工作任务?
How to properly parallelize worker tasks?
考虑以下代码片段,注意将 numberTasksToSpinOff
设置为 1 和 3,4 或更多(取决于您计算机上的线程资源)之间总 运行时间的差异。当分拆更多任务时,我注意到 运行 次更长。
我故意将数据集合传递给每个工作任务同时读取的每个工作实例。我认为只要这些操作只是读取或枚举,任务就可以访问共享数据结构而不会阻塞。
我的目标是分拆多个任务,这些任务通过读取操作迭代相同的共享数据结构,并在大约同一时间完成,而不管分拆的任务数量如何。
编辑:请参阅第二个代码片段,我在其中实现 Parallel.Foreach()
并创建每个工作人员自己的数据集,因此不会通过不同的 tasks/threads 访问相同的数据结构.然而,我仍然看到不可接受的开销。
class Program
{
static void Main(string[] args)
{
Console.WriteLine($"Entry Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");
//run
var task = Task.Run(async () =>
{
Console.WriteLine($"Entry RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
await RunMe();
Console.WriteLine($"Exit RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
});
task.Wait();
Console.WriteLine($"Exit Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("Press key to quit");
Console.ReadLine();
}
private static async Task RunMe()
{
var watch = new Stopwatch();
var numberTasksToSpinOff = 6;
var numberItems = 20000;
var random = new Random((int)DateTime.Now.Ticks);
var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
var tasks = new List<Task>();
var workers = new List<Worker>();
//structure workers
for (int i = 1; i <= numberTasksToSpinOff; i++)
{
workers.Add(new Worker(i, dataPoints));
}
//start timer
watch.Restart();
//spin off tasks
foreach (var worker in workers)
{
tasks.Add(Task.Run(() =>
{
Console.WriteLine($"Entry WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
worker.DoSomeWork();
Console.WriteLine($"Exit WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
}));
}
//completion tasks
await Task.WhenAll(tasks);
//stop timer
watch.Stop();
Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
}
}
public class Worker
{
public int WorkerId { get; set; }
private List<double> _data;
public Worker(int workerId, List<double> data)
{
WorkerId = workerId;
_data = data;
}
public void DoSomeWork()
{
var indexPos = 0;
foreach (var dp in _data)
{
var subSet = _data.Skip(indexPos).Take(_data.Count - indexPos).ToList();
indexPos++;
}
}
}
第二个代码片段:
class Program
{
static void Main(string[] args)
{
var watch = new Stopwatch();
var numberTasksToSpinOff = 1;
var numberItems = 20000;
//var random = new Random((int)DateTime.Now.Ticks);
//var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
var workers = new List<Worker>();
//structure workers
for (int i = 1; i <= numberTasksToSpinOff; i++)
{
workers.Add(new Worker(i));
}
//start timer
watch.Restart();
//parellel work
if (workers.Any())
{
var processorCount = Environment.ProcessorCount;
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = processorCount };
Parallel.ForEach(workers, parallelOptions, DoSomeWork);
}
//stop timer
watch.Stop();
Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
Console.WriteLine("Press key to quit");
Console.ReadLine();
}
private static void DoSomeWork(Worker worker)
{
Console.WriteLine($"WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
var indexPos = 0;
foreach (var dp in worker.Data)
{
var subSet = worker.Data.Skip(indexPos).Take(worker.Data.Count - indexPos).ToList();
indexPos++;
}
}
}
public class Worker
{
public int WorkerId { get; set; }
public List<double> Data { get; set; }
public Worker(int workerId)
{
WorkerId = workerId;
var numberItems = 20000;
var random = new Random((int)DateTime.Now.Ticks);
Data = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
}
}
您看过并行任务吗?然后你可以做这样的事情。
例如:
if (workers.Any())
{
var parallelOptions = new ParallelOptions {MaxDegreeOfParallelism = Environment.ProcessorCount};
Parallel.ForEach(workers, parallelOptions, DoSomeWork);
}
private static void DoSomeWork(Worker worker)
{
}
注意:以下答案基于测试和观察,并非权威知识。
分拆的任务越多,产生的开销就越大,因此总执行时间也会增加。 但是 如果你从另一个角度考虑它,你会发现实际处理的 "data-points" 会增加你旋转的更多任务(直到达到可用硬件的限制 -线程):
以下值是在我的机器 (4C/8T) 上生成的,每个列表有 10000 个点:
- 1 名工人 -> 1891 毫秒 -> 5288 p/s
- 2 名工人 -> 1921 毫秒 -> 10411 p/s
- 4 名工人 -> 2670 毫秒 -> 14981 p/s
- 8 个工人 -> 4871 毫秒 -> 16423 p/s
- 12 名工人 -> 7449 毫秒 -> 16109 p/s
在那里你看到,直到我达到我的 "core-limit" 处理的数据显着增加,然后直到我达到我的 "thread-limit" 它增加仍然很明显,但之后它再次减少,因为开销增加并且没有更多可用的硬件资源。
考虑以下代码片段,注意将 numberTasksToSpinOff
设置为 1 和 3,4 或更多(取决于您计算机上的线程资源)之间总 运行时间的差异。当分拆更多任务时,我注意到 运行 次更长。
我故意将数据集合传递给每个工作任务同时读取的每个工作实例。我认为只要这些操作只是读取或枚举,任务就可以访问共享数据结构而不会阻塞。
我的目标是分拆多个任务,这些任务通过读取操作迭代相同的共享数据结构,并在大约同一时间完成,而不管分拆的任务数量如何。
编辑:请参阅第二个代码片段,我在其中实现 Parallel.Foreach()
并创建每个工作人员自己的数据集,因此不会通过不同的 tasks/threads 访问相同的数据结构.然而,我仍然看到不可接受的开销。
class Program
{
static void Main(string[] args)
{
Console.WriteLine($"Entry Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");
//run
var task = Task.Run(async () =>
{
Console.WriteLine($"Entry RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
await RunMe();
Console.WriteLine($"Exit RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
});
task.Wait();
Console.WriteLine($"Exit Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("Press key to quit");
Console.ReadLine();
}
private static async Task RunMe()
{
var watch = new Stopwatch();
var numberTasksToSpinOff = 6;
var numberItems = 20000;
var random = new Random((int)DateTime.Now.Ticks);
var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
var tasks = new List<Task>();
var workers = new List<Worker>();
//structure workers
for (int i = 1; i <= numberTasksToSpinOff; i++)
{
workers.Add(new Worker(i, dataPoints));
}
//start timer
watch.Restart();
//spin off tasks
foreach (var worker in workers)
{
tasks.Add(Task.Run(() =>
{
Console.WriteLine($"Entry WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
worker.DoSomeWork();
Console.WriteLine($"Exit WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
}));
}
//completion tasks
await Task.WhenAll(tasks);
//stop timer
watch.Stop();
Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
}
}
public class Worker
{
public int WorkerId { get; set; }
private List<double> _data;
public Worker(int workerId, List<double> data)
{
WorkerId = workerId;
_data = data;
}
public void DoSomeWork()
{
var indexPos = 0;
foreach (var dp in _data)
{
var subSet = _data.Skip(indexPos).Take(_data.Count - indexPos).ToList();
indexPos++;
}
}
}
第二个代码片段:
class Program
{
static void Main(string[] args)
{
var watch = new Stopwatch();
var numberTasksToSpinOff = 1;
var numberItems = 20000;
//var random = new Random((int)DateTime.Now.Ticks);
//var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
var workers = new List<Worker>();
//structure workers
for (int i = 1; i <= numberTasksToSpinOff; i++)
{
workers.Add(new Worker(i));
}
//start timer
watch.Restart();
//parellel work
if (workers.Any())
{
var processorCount = Environment.ProcessorCount;
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = processorCount };
Parallel.ForEach(workers, parallelOptions, DoSomeWork);
}
//stop timer
watch.Stop();
Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
Console.WriteLine("Press key to quit");
Console.ReadLine();
}
private static void DoSomeWork(Worker worker)
{
Console.WriteLine($"WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
var indexPos = 0;
foreach (var dp in worker.Data)
{
var subSet = worker.Data.Skip(indexPos).Take(worker.Data.Count - indexPos).ToList();
indexPos++;
}
}
}
public class Worker
{
public int WorkerId { get; set; }
public List<double> Data { get; set; }
public Worker(int workerId)
{
WorkerId = workerId;
var numberItems = 20000;
var random = new Random((int)DateTime.Now.Ticks);
Data = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
}
}
您看过并行任务吗?然后你可以做这样的事情。
例如:
if (workers.Any())
{
var parallelOptions = new ParallelOptions {MaxDegreeOfParallelism = Environment.ProcessorCount};
Parallel.ForEach(workers, parallelOptions, DoSomeWork);
}
private static void DoSomeWork(Worker worker)
{
}
注意:以下答案基于测试和观察,并非权威知识。
分拆的任务越多,产生的开销就越大,因此总执行时间也会增加。 但是 如果你从另一个角度考虑它,你会发现实际处理的 "data-points" 会增加你旋转的更多任务(直到达到可用硬件的限制 -线程):
以下值是在我的机器 (4C/8T) 上生成的,每个列表有 10000 个点:
- 1 名工人 -> 1891 毫秒 -> 5288 p/s
- 2 名工人 -> 1921 毫秒 -> 10411 p/s
- 4 名工人 -> 2670 毫秒 -> 14981 p/s
- 8 个工人 -> 4871 毫秒 -> 16423 p/s
- 12 名工人 -> 7449 毫秒 -> 16109 p/s
在那里你看到,直到我达到我的 "core-limit" 处理的数据显着增加,然后直到我达到我的 "thread-limit" 它增加仍然很明显,但之后它再次减少,因为开销增加并且没有更多可用的硬件资源。