有效地使用数千个超时任务
Using thousands of Tasks with a timeout efficiently
我正在实现一个库 L,它通过套接字与另一个应用程序 A 通信。
基本工作流程如下:
- L 连接到 A。
- L 将~50.000 条信息 I 发送给 A,并且
为每个发出的 I 创建一个任务 T。
- L 侦听来自 A 的传入结果,一旦有结果,就使用
TaskCompletionSource 设置任务的结果 T
- L 创建一个设置了超时的任务 T2 (Task.WhenAny(T,Task.Delay(xx))
- L 使用 Task.WhenAll(T2) 等待所有发送信息的超时或结果。
管理底层数据结构完全没有问题。主要问题是在我的电脑上组装 "main" Task.WhenAll(T2) 大约需要 5-6 秒。 50.000 个条目(创建 50.000*2+1 个任务)。
但是,我想不出更轻量级的方法来完成同样的任务。它应该使用所有可用的核心并且是非阻塞的,并且还支持超时。
有没有办法使用并行或线程池 类 来提高性能?
编辑:
显示基本设置的代码:
https://dotnetfiddle.net/gIq2DP
开始总共 n LongRunningTasks
,其中 n 是您机器上的内核数。每个任务应该 运行 在一个核心上。为每个要发送的 I 创建 50K 新任务是一种浪费。相反,将任务设计为接受 I 和套接字信息 - 这些信息将发送到哪里 .
创建 BlockingCollection<Tuple<I, SocketInfo>>
。启动一项任务来填充此阻塞集合。您之前创建的其他 n 长 运行ning 任务可以继续获取信息元组和发送信息的地址,然后在一个循环中为您执行该工作,该循环将在阻塞收集完成时中断。
可以在长 运行ning 任务本身中设置超时。
整个设置将使您的 CPU 最大限度地忙于 有用的工作 ,而不是让它不必要地忙于 "job" 的 [=26] =]创建了 50K 个任务.
由于发生在主内存之外的操作(如此网络操作)对于 CPU 是 ,请随意设置 n 不是刚好等于您机器中的内核数量,甚至是该值的三倍。在我的代码演示中,我将其设置为仅等于内核数。
使用提供的代码 link,这是一种方式...
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Net.NetworkInformation;
using System.Threading.Tasks;
namespace TestConsoleApplication
{
public static class Test
{
public static void Main()
{
TaskRunningTest();
}
private static void TaskRunningTest()
{
var s = new Stopwatch();
const int totalInformationChunks = 50000;
var baseProcessorTaskArray = new Task[Environment.ProcessorCount];
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var tcs = new TaskCompletionSource<int>();
var itemsToProcess = new BlockingCollection<Tuple<Information, Address>>(totalInformationChunks);
s.Start();
//Start a new task to populate the "itemsToProcess"
taskFactory.StartNew(() =>
{
// Add Tuples of Information and Address to which this information is to be sent to.
Console.WriteLine("Done intializing all the jobs...");
// Finally signal that you are done by saying..
itemsToProcess.CompleteAdding();
});
//Initializing the base tasks
for (var index = 0; index < baseProcessorTaskArray.Length; index++)
{
var thisIndex = index;
baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
{
while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0)
{
Tuple<Information, Address> item;
itemsToProcess.TryTake(out item);
//Process the item
tcs.TrySetResult(thisIndex);
}
});
}
// Need to provide new timeout logic now
// Depending upon what you are trying to achieve with timeout, you can devise out the way
// Wait for the base tasks to completely empty OR
// timeout and then stop the stopwatch.
Task.WaitAll(baseProcessorTaskArray);
s.Stop();
Console.WriteLine(s.ElapsedMilliseconds);
}
private class Address
{
//This class should have the socket information
}
private class Information
{
//This class will have the Information to send
}
}
}
分析显示大部分时间(90%?)花在计时器设置、到期和处置上。这对我来说似乎是合理的。
也许您可以创建自己的超级便宜的超时机制。将超时排队到按到期时间排序的优先级队列中。然后,运行 每 100 毫秒一个计时器,并使该计时器使优先级队列中到期的所有内容都过期。
这样做的成本是每个超时 TaskCompletionSource
和一些小的进一步处理。
您甚至可以通过将超时从队列中移除并删除 TaskCompletionSource
.
来取消超时
我正在实现一个库 L,它通过套接字与另一个应用程序 A 通信。
基本工作流程如下:
- L 连接到 A。
- L 将~50.000 条信息 I 发送给 A,并且 为每个发出的 I 创建一个任务 T。
- L 侦听来自 A 的传入结果,一旦有结果,就使用 TaskCompletionSource 设置任务的结果 T
- L 创建一个设置了超时的任务 T2 (Task.WhenAny(T,Task.Delay(xx))
- L 使用 Task.WhenAll(T2) 等待所有发送信息的超时或结果。
管理底层数据结构完全没有问题。主要问题是在我的电脑上组装 "main" Task.WhenAll(T2) 大约需要 5-6 秒。 50.000 个条目(创建 50.000*2+1 个任务)。
但是,我想不出更轻量级的方法来完成同样的任务。它应该使用所有可用的核心并且是非阻塞的,并且还支持超时。
有没有办法使用并行或线程池 类 来提高性能?
编辑: 显示基本设置的代码: https://dotnetfiddle.net/gIq2DP
开始总共 n LongRunningTasks
,其中 n 是您机器上的内核数。每个任务应该 运行 在一个核心上。为每个要发送的 I 创建 50K 新任务是一种浪费。相反,将任务设计为接受 I 和套接字信息 - 这些信息将发送到哪里 .
创建 BlockingCollection<Tuple<I, SocketInfo>>
。启动一项任务来填充此阻塞集合。您之前创建的其他 n 长 运行ning 任务可以继续获取信息元组和发送信息的地址,然后在一个循环中为您执行该工作,该循环将在阻塞收集完成时中断。
可以在长 运行ning 任务本身中设置超时。
整个设置将使您的 CPU 最大限度地忙于 有用的工作 ,而不是让它不必要地忙于 "job" 的 [=26] =]创建了 50K 个任务.
由于发生在主内存之外的操作(如此网络操作)对于 CPU 是
使用提供的代码 link,这是一种方式...
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Net.NetworkInformation;
using System.Threading.Tasks;
namespace TestConsoleApplication
{
public static class Test
{
public static void Main()
{
TaskRunningTest();
}
private static void TaskRunningTest()
{
var s = new Stopwatch();
const int totalInformationChunks = 50000;
var baseProcessorTaskArray = new Task[Environment.ProcessorCount];
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var tcs = new TaskCompletionSource<int>();
var itemsToProcess = new BlockingCollection<Tuple<Information, Address>>(totalInformationChunks);
s.Start();
//Start a new task to populate the "itemsToProcess"
taskFactory.StartNew(() =>
{
// Add Tuples of Information and Address to which this information is to be sent to.
Console.WriteLine("Done intializing all the jobs...");
// Finally signal that you are done by saying..
itemsToProcess.CompleteAdding();
});
//Initializing the base tasks
for (var index = 0; index < baseProcessorTaskArray.Length; index++)
{
var thisIndex = index;
baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
{
while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0)
{
Tuple<Information, Address> item;
itemsToProcess.TryTake(out item);
//Process the item
tcs.TrySetResult(thisIndex);
}
});
}
// Need to provide new timeout logic now
// Depending upon what you are trying to achieve with timeout, you can devise out the way
// Wait for the base tasks to completely empty OR
// timeout and then stop the stopwatch.
Task.WaitAll(baseProcessorTaskArray);
s.Stop();
Console.WriteLine(s.ElapsedMilliseconds);
}
private class Address
{
//This class should have the socket information
}
private class Information
{
//This class will have the Information to send
}
}
}
分析显示大部分时间(90%?)花在计时器设置、到期和处置上。这对我来说似乎是合理的。
也许您可以创建自己的超级便宜的超时机制。将超时排队到按到期时间排序的优先级队列中。然后,运行 每 100 毫秒一个计时器,并使该计时器使优先级队列中到期的所有内容都过期。
这样做的成本是每个超时 TaskCompletionSource
和一些小的进一步处理。
您甚至可以通过将超时从队列中移除并删除 TaskCompletionSource
.