为什么简单的多任务在多线程工作时不起作用?
Why simple multi task doesn't work when multi thread does?
var finalList = new List<string>();
var list = new List<int> {1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ................. 999999};
var init = 0;
var limitPerThread = 5;
var countDownEvent = new CountdownEvent(list.Count);
for (var i = 0; i < list.Count; i++)
{
var listToFilter = list.Skip(init).Take(limitPerThread).ToList();
new Thread(delegate()
{
Foo(listToFilter);
countDownEvent.Signal();
}).Start();
init += limitPerThread;
}
//wait all to finish
countDownEvent.Wait();
private static void Foo(List<int> listToFilter)
{
var listDone = Boo(listToFilter);
lock (Object)
{
finalList.AddRange(listDone);
}
}
这不是:
var taskList = new List<Task>();
for (var i = 0; i < list.Count; i++)
{
var listToFilter = list.Skip(init).Take(limitPerThread).ToList();
var task = Task.Factory.StartNew(() => Foo(listToFilter));
taskList.add(task);
init += limitPerThread;
}
//wait all to finish
Task.WaitAll(taskList.ToArray());
这个进程最后至少要创建700个线程。当我 运行 使用线程时,它会工作并创建所有线程。但是 Task
它没有..它似乎不是开始倍数 Tasks
async.
我真的很想知道为什么....有什么想法吗?
编辑
带有 PLINQ 的另一个版本(按照建议)。
var taskList = new List<Task>(list.Count);
Parallel.ForEach(taskList, t =>
{
var listToFilter = list.Skip(init).Take(limitPerThread).ToList();
Foo(listToFilter);
init += limitPerThread;
t.Start();
});
Task.WaitAll(taskList.ToArray());
EDIT2:
public static List<Communication> Foo(List<Dispositive> listToPing)
{
var listResult = new List<Communication>();
foreach (var item in listToPing)
{
var listIps = item.listIps;
var communication = new Communication
{
IdDispositive = item.Id
};
try
{
for (var i = 0; i < listIps.Count(); i++)
{
var oPing = new Ping().Send(listIps.ElementAt(i).IpAddress, 10000);
if (oPing != null)
{
if (oPing.Status.Equals(IPStatus.TimedOut) && listIps.Count() > i+1)
continue;
if (oPing.Status.Equals(IPStatus.TimedOut))
{
communication.Result = "NOK";
break;
}
communication.Result = oPing.Status.Equals(IPStatus.Success) ? "OK" : "NOK";
break;
}
if (listIps.Count() > i+1)
continue;
communication.Result = "NOK";
break;
}
}
catch
{
communication.Result = "NOK";
}
finally
{
listResult.Add(communication);
}
}
return listResult;
}
Task
s 不是多线程。它们 可以 用于此目的,但大多数情况下它们实际上用于相反的用途 - 在单个线程上进行多路复用。
要将任务用于多线程,我建议使用 Parallel LINQ。它已经有很多优化,例如列表的智能分区和只产生与 CPU 核心一样多的线程等。
要理解 Task
和 async
,请这样想 - 典型的工作负载通常包括需要等待的 IO。也许你读取一个文件,或者查询一个网络服务,或者访问一个数据库,或者其他什么。关键是 - 你的线程会等待很长时间(至少 CPU 个周期),直到你从某个遥远的目的地得到响应。
在 Olden Days™ 中,这意味着您的线程会被锁定(暂停),直到收到响应。如果您想同时做其他事情,则需要生成一个新线程。这是可行的,但效率不高。每个 OS 线程都会带来很大的开销(内存、内核资源)。你最终可能会遇到多个线程主动燃烧 CPU,这意味着 OS 需要在它们之间切换,以便每个线程获得一点 CPU 时间,而这些 "context switches" 挺贵的。
async
更改了该工作流程。现在您可以在同一个线程上执行多个工作负载。虽然一项工作是 await
从遥远的来源获取结果,但另一项工作可以介入并使用该线程做其他有用的事情。当第二个工作负载达到其自身 await
时,第一个工作负载可以唤醒并继续。
毕竟,产生比 CPU 个核心更多的线程没有意义。你不会以这种方式完成更多工作。恰恰相反 - 更多的时间将花费在切换线程上,而更少的时间将可用于有用的工作。
这就是 Task
/async
/await
最初设计的目的。然而,Parallel LINQ 也利用了它并将其重新用于多线程。在这种情况下,您可以这样看 - 其他线程就是您的主线程是您的主线程正在等待的"faraway destination"。
任务在线程池上执行。这意味着少数线程将服务于大量任务。您有多线程,但不是每个任务都生成一个线程。
你应该使用任务。您的目标应该是使用与 CPU 一样多的线程。一般都是线程池帮你做的。
你是如何衡量表现的?您认为 700
线程会比 4
线程执行的 700
任务运行得更快吗?不,他们不会。
It seems like its not starting multiples Tasks async
你是怎么想到这个的?正如其他人在评论和其他答案中所建议的那样,您可能需要删除线程创建,因为在创建 700
线程后您会降低系统性能,因为您的线程会相互争用处理器时间,而无需任何工作完成得更快。
因此,您需要将用于 IO 操作的 async/await
添加到具有 SendPingAsync
版本的 Foo
方法中。此外,您的方法可以简单化,因为对 listIps.Count() > i + 1
条件的许多检查都是无用的 - 您在 for
条件块中执行此操作:
public static async Task<List<Communication>> Foo(List<Dispositive> listToPing)
{
var listResult = new List<Communication>();
foreach (var item in listToPing)
{
var listIps = item.listIps;
var communication = new Communication
{
IdDispositive = item.Id
};
try
{
var ping = new Ping();
communication.Result = "NOK";
for (var i = 0; i < listIps.Count(); i++)
{
var oPing = await ping.SendPingAsync(listIps.ElementAt(i).IpAddress, 10000);
if (oPing != null)
{
if (oPing.Status.Equals(IPStatus.Success)
{
communication.Result = "OK";
break;
}
}
}
}
catch
{
communication.Result = "NOK";
}
finally
{
listResult.Add(communication);
}
}
return listResult;
}
您的代码的其他问题是 PLINQ
版本不是线程安全的:
init += limitPerThread;
这在并行执行时可能会失败。您可以引入一些辅助方法,例如 this answer:
private async Task<List<PingReply>> PingAsync(List<Communication> theListOfIPs)
{
Ping pingSender = new Ping();
var tasks = theListOfIPs.Select(ip => pingSender.SendPingAsync(ip, 10000));
var results = await Task.WhenAll(tasks);
return results.ToList();
}
并进行这种检查(为简单起见删除了try/catch
逻辑):
public static async Task<List<Communication>> Foo(List<Dispositive> listToPing)
{
var listResult = new List<Communication>();
foreach (var item in listToPing)
{
var listIps = item.listIps;
var communication = new Communication
{
IdDispositive = item.Id
};
var check = await PingAsync(listIps);
communication.Result = check.Any(p => p.Status.Equals(IPStatus.Success)) ? "OK" : "NOK";
}
}
您可能应该使用 Task.Run
而不是 Task.StartNew
以确保您没有阻塞 UI 线程。
var finalList = new List<string>();
var list = new List<int> {1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ................. 999999};
var init = 0;
var limitPerThread = 5;
var countDownEvent = new CountdownEvent(list.Count);
for (var i = 0; i < list.Count; i++)
{
var listToFilter = list.Skip(init).Take(limitPerThread).ToList();
new Thread(delegate()
{
Foo(listToFilter);
countDownEvent.Signal();
}).Start();
init += limitPerThread;
}
//wait all to finish
countDownEvent.Wait();
private static void Foo(List<int> listToFilter)
{
var listDone = Boo(listToFilter);
lock (Object)
{
finalList.AddRange(listDone);
}
}
这不是:
var taskList = new List<Task>();
for (var i = 0; i < list.Count; i++)
{
var listToFilter = list.Skip(init).Take(limitPerThread).ToList();
var task = Task.Factory.StartNew(() => Foo(listToFilter));
taskList.add(task);
init += limitPerThread;
}
//wait all to finish
Task.WaitAll(taskList.ToArray());
这个进程最后至少要创建700个线程。当我 运行 使用线程时,它会工作并创建所有线程。但是 Task
它没有..它似乎不是开始倍数 Tasks
async.
我真的很想知道为什么....有什么想法吗?
编辑
带有 PLINQ 的另一个版本(按照建议)。
var taskList = new List<Task>(list.Count);
Parallel.ForEach(taskList, t =>
{
var listToFilter = list.Skip(init).Take(limitPerThread).ToList();
Foo(listToFilter);
init += limitPerThread;
t.Start();
});
Task.WaitAll(taskList.ToArray());
EDIT2:
public static List<Communication> Foo(List<Dispositive> listToPing)
{
var listResult = new List<Communication>();
foreach (var item in listToPing)
{
var listIps = item.listIps;
var communication = new Communication
{
IdDispositive = item.Id
};
try
{
for (var i = 0; i < listIps.Count(); i++)
{
var oPing = new Ping().Send(listIps.ElementAt(i).IpAddress, 10000);
if (oPing != null)
{
if (oPing.Status.Equals(IPStatus.TimedOut) && listIps.Count() > i+1)
continue;
if (oPing.Status.Equals(IPStatus.TimedOut))
{
communication.Result = "NOK";
break;
}
communication.Result = oPing.Status.Equals(IPStatus.Success) ? "OK" : "NOK";
break;
}
if (listIps.Count() > i+1)
continue;
communication.Result = "NOK";
break;
}
}
catch
{
communication.Result = "NOK";
}
finally
{
listResult.Add(communication);
}
}
return listResult;
}
Task
s 不是多线程。它们 可以 用于此目的,但大多数情况下它们实际上用于相反的用途 - 在单个线程上进行多路复用。
要将任务用于多线程,我建议使用 Parallel LINQ。它已经有很多优化,例如列表的智能分区和只产生与 CPU 核心一样多的线程等。
要理解 Task
和 async
,请这样想 - 典型的工作负载通常包括需要等待的 IO。也许你读取一个文件,或者查询一个网络服务,或者访问一个数据库,或者其他什么。关键是 - 你的线程会等待很长时间(至少 CPU 个周期),直到你从某个遥远的目的地得到响应。
在 Olden Days™ 中,这意味着您的线程会被锁定(暂停),直到收到响应。如果您想同时做其他事情,则需要生成一个新线程。这是可行的,但效率不高。每个 OS 线程都会带来很大的开销(内存、内核资源)。你最终可能会遇到多个线程主动燃烧 CPU,这意味着 OS 需要在它们之间切换,以便每个线程获得一点 CPU 时间,而这些 "context switches" 挺贵的。
async
更改了该工作流程。现在您可以在同一个线程上执行多个工作负载。虽然一项工作是 await
从遥远的来源获取结果,但另一项工作可以介入并使用该线程做其他有用的事情。当第二个工作负载达到其自身 await
时,第一个工作负载可以唤醒并继续。
毕竟,产生比 CPU 个核心更多的线程没有意义。你不会以这种方式完成更多工作。恰恰相反 - 更多的时间将花费在切换线程上,而更少的时间将可用于有用的工作。
这就是 Task
/async
/await
最初设计的目的。然而,Parallel LINQ 也利用了它并将其重新用于多线程。在这种情况下,您可以这样看 - 其他线程就是您的主线程是您的主线程正在等待的"faraway destination"。
任务在线程池上执行。这意味着少数线程将服务于大量任务。您有多线程,但不是每个任务都生成一个线程。
你应该使用任务。您的目标应该是使用与 CPU 一样多的线程。一般都是线程池帮你做的。
你是如何衡量表现的?您认为 700
线程会比 4
线程执行的 700
任务运行得更快吗?不,他们不会。
It seems like its not starting multiples Tasks async
你是怎么想到这个的?正如其他人在评论和其他答案中所建议的那样,您可能需要删除线程创建,因为在创建 700
线程后您会降低系统性能,因为您的线程会相互争用处理器时间,而无需任何工作完成得更快。
因此,您需要将用于 IO 操作的 async/await
添加到具有 SendPingAsync
版本的 Foo
方法中。此外,您的方法可以简单化,因为对 listIps.Count() > i + 1
条件的许多检查都是无用的 - 您在 for
条件块中执行此操作:
public static async Task<List<Communication>> Foo(List<Dispositive> listToPing)
{
var listResult = new List<Communication>();
foreach (var item in listToPing)
{
var listIps = item.listIps;
var communication = new Communication
{
IdDispositive = item.Id
};
try
{
var ping = new Ping();
communication.Result = "NOK";
for (var i = 0; i < listIps.Count(); i++)
{
var oPing = await ping.SendPingAsync(listIps.ElementAt(i).IpAddress, 10000);
if (oPing != null)
{
if (oPing.Status.Equals(IPStatus.Success)
{
communication.Result = "OK";
break;
}
}
}
}
catch
{
communication.Result = "NOK";
}
finally
{
listResult.Add(communication);
}
}
return listResult;
}
您的代码的其他问题是 PLINQ
版本不是线程安全的:
init += limitPerThread;
这在并行执行时可能会失败。您可以引入一些辅助方法,例如 this answer:
private async Task<List<PingReply>> PingAsync(List<Communication> theListOfIPs)
{
Ping pingSender = new Ping();
var tasks = theListOfIPs.Select(ip => pingSender.SendPingAsync(ip, 10000));
var results = await Task.WhenAll(tasks);
return results.ToList();
}
并进行这种检查(为简单起见删除了try/catch
逻辑):
public static async Task<List<Communication>> Foo(List<Dispositive> listToPing)
{
var listResult = new List<Communication>();
foreach (var item in listToPing)
{
var listIps = item.listIps;
var communication = new Communication
{
IdDispositive = item.Id
};
var check = await PingAsync(listIps);
communication.Result = check.Any(p => p.Status.Equals(IPStatus.Success)) ? "OK" : "NOK";
}
}
您可能应该使用 Task.Run
而不是 Task.StartNew
以确保您没有阻塞 UI 线程。