运行 C# 中的多个任务
Running Multiple Tasks in C#
我正在开发一个具有一些非实时功能的实时软件。
规格:
- 需要实时执行方法1;
- 需要每 60 分钟执行一次 方法 2。
我应该使用多线程吗?任务?
现在我正在使用定时器,但我认为这不是一个很好的用途。
当文件出现在文件夹中时立即开始文件处理通常不是一个好主意。例如,如果 100 个文件同时到达,那么您最终将有 100 个线程都在竞争相同的物理资源,例如磁头等。在我看来,最好的方法是实时,或者至少尽可能实时使用 .Net 通知新文件存在,然后使用生产者-消费者队列处理这些文件
最好使用线程安全的集合,例如ConcurrentQueue with the number of active long running, e.g. emailing threads in process at any time controlled by a synchronization mechanism such as SemaphoreSlim
使用 FileSystemWatcher class. Execution of processing in a time based manner should, ideally, use the Observer Pattern
可以轻松实现目录中新创建文件的通知
我在下面创建了一个简单的应用程序来演示这些应该有帮助的概念。
该应用程序包含多个键 classes
- SingletonBase class 为 classes 实现了 Singleton Pattern,应该只实例化一次,例如文件系统观察者 class
- FileSystemMonitor class 监视正在创建的新文件的目录并立即 通知处理队列有一个新文件文件已存在。
当然,您可以很容易地修改它以开始立即处理文件,但如上所述,这通常不是一个好主意
FilesWorkerclass处理队列访问和相关任务同步。
using System;
using System.Collections.Concurrent;
using System.Globalization;
using System.Reactive.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using System.Security.Permissions;
namespace ConsoleApplication9
{
internal class Program
{
private static void Main(string[] args)
{
const string directorytowatch = @"d:\junk\watch\"; // the directory to watch for new files
// this initiates a filesystemmonitor to watch for new files being created
Task.Factory.StartNew(() => FileSystemMonitor.Instance.WatchDirectory(directorytowatch));
// initiate the processing of any new files
FilesWorker.Instance.ReadQueue();
Console.ReadLine();
}
}
/// <summary>
/// Monitors the filesystem in "real-time" to check for new files
/// </summary>
[PermissionSet(SecurityAction.Demand, Name = "FullTrust")]
internal class FileSystemMonitor : SingletonBase<FileSystemMonitor>
{
private FileSystemMonitor()
{
}
internal void WatchDirectory(string dir)
{
var watcher = new FileSystemWatcher(dir)
{
NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite | NotifyFilters.LastAccess,
Filter = "*.*"
};
// watch all files
watcher.Created += WatcherOnCreated;
watcher.EnableRaisingEvents = true;
}
private static void WatcherOnCreated(object sender, FileSystemEventArgs fileSystemEventArgs)
{
Console.WriteLine(fileSystemEventArgs.FullPath + "" + fileSystemEventArgs.ChangeType); // for test purposes
var fileInfo = new FileInfo(fileSystemEventArgs.FullPath);
FilesWorker.Instance.AddToQueue(fileInfo);
}
}
/// <summary>
/// handles the queue of files to be processed and the syncronisation of tasks related to the queue
/// </summary>
internal class FilesWorker : SingletonBase<FilesWorker>
{
private FilesWorker()
{
}
/// <summary>
/// The queue of files which still need to be processed
/// </summary>
private readonly ConcurrentQueue<FileInfo> _filesQueue = new ConcurrentQueue<FileInfo>();
/// <summary>
/// create a semaphore to limit the number of threads which can process a file at any given time
// In this case only allow 2 to be processed at any given time
/// </summary>
private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(2, 2);
/// <summary>
/// add new file to the queue
/// </summary>
/// <param name="fileInfo"></param>
internal void AddToQueue(FileInfo fileInfo)
{
_filesQueue.Enqueue(fileInfo);
}
/// <summary>
/// executes a method on a given timeframe
/// </summary>
/// <param name="method">method to execute</param>
/// <param name="timer">time between execution runs (seconds)</param>
internal void ExecuteMethod(Action method, double timer)
{
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(timer));
// Token for cancelation
var source = new CancellationTokenSource();
observable.Subscribe(x =>
{
var task = new Task(method);
task.Start();
}, source.Token);
}
/// <summary>
/// Get any new files and send for processing
/// </summary>
internal void ReadQueue()
{
// check the queue every two seconds
ExecuteMethod(ProcessQueue, 2d);
}
/// <summary>
/// takes files from the queue and starts processing
/// </summary>
internal void ProcessQueue()
{
try
{
Semaphore.Wait();
FileInfo fileInfo;
while (_filesQueue.TryDequeue(out fileInfo))
{
var fileProcessor = new FileProcessor();
fileProcessor.ProcessFile(fileInfo);
}
}
finally
{
Semaphore.Release();
}
}
}
internal class FileProcessor
{
internal void ProcessFile(FileInfo fileInfo)
{
// do some long running tasks with the file
}
}
/// <summary>
/// Implements singleton pattern on all classes which derive from it
/// </summary>
/// <typeparam name="T">Derived class</typeparam>
public abstract class SingletonBase<T> where T : class
{
public static T Instance
{
get { return SingletonFactory.Instance; }
}
/// <summary>
/// The singleton class factory to create the singleton instance.
/// </summary>
private class SingletonFactory
{
static SingletonFactory()
{
}
private SingletonFactory()
{
}
internal static readonly T Instance = GetInstance();
private static T GetInstance()
{
var theType = typeof(T);
T inst;
try
{
inst = (T)theType
.InvokeMember(theType.Name,
BindingFlags.CreateInstance | BindingFlags.Instance
| BindingFlags.NonPublic,
null, null, null,
CultureInfo.InvariantCulture);
}
catch (MissingMethodException ex)
{
var exception = new TypeLoadException(string.Format(
CultureInfo.CurrentCulture,
"The type '{0}' must have a private constructor to " +
"be used in the Singleton pattern.", theType.FullName)
, ex);
//LogManager.LogException(LogManager.EventIdInternal, exception, "error in instantiating the singleton");
throw exception;
}
return inst;
}
}
}
}
我正在开发一个具有一些非实时功能的实时软件。
规格:
- 需要实时执行方法1;
- 需要每 60 分钟执行一次 方法 2。
我应该使用多线程吗?任务?
现在我正在使用定时器,但我认为这不是一个很好的用途。
当文件出现在文件夹中时立即开始文件处理通常不是一个好主意。例如,如果 100 个文件同时到达,那么您最终将有 100 个线程都在竞争相同的物理资源,例如磁头等。在我看来,最好的方法是实时,或者至少尽可能实时使用 .Net 通知新文件存在,然后使用生产者-消费者队列处理这些文件
最好使用线程安全的集合,例如ConcurrentQueue with the number of active long running, e.g. emailing threads in process at any time controlled by a synchronization mechanism such as SemaphoreSlim
使用 FileSystemWatcher class. Execution of processing in a time based manner should, ideally, use the Observer Pattern
可以轻松实现目录中新创建文件的通知我在下面创建了一个简单的应用程序来演示这些应该有帮助的概念。
该应用程序包含多个键 classes
- SingletonBase class 为 classes 实现了 Singleton Pattern,应该只实例化一次,例如文件系统观察者 class
- FileSystemMonitor class 监视正在创建的新文件的目录并立即 通知处理队列有一个新文件文件已存在。 当然,您可以很容易地修改它以开始立即处理文件,但如上所述,这通常不是一个好主意
FilesWorkerclass处理队列访问和相关任务同步。
using System; using System.Collections.Concurrent; using System.Globalization; using System.Reactive.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; using System.IO; using System.Security.Permissions; namespace ConsoleApplication9 { internal class Program { private static void Main(string[] args) { const string directorytowatch = @"d:\junk\watch\"; // the directory to watch for new files // this initiates a filesystemmonitor to watch for new files being created Task.Factory.StartNew(() => FileSystemMonitor.Instance.WatchDirectory(directorytowatch)); // initiate the processing of any new files FilesWorker.Instance.ReadQueue(); Console.ReadLine(); } } /// <summary> /// Monitors the filesystem in "real-time" to check for new files /// </summary> [PermissionSet(SecurityAction.Demand, Name = "FullTrust")] internal class FileSystemMonitor : SingletonBase<FileSystemMonitor> { private FileSystemMonitor() { } internal void WatchDirectory(string dir) { var watcher = new FileSystemWatcher(dir) { NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite | NotifyFilters.LastAccess, Filter = "*.*" }; // watch all files watcher.Created += WatcherOnCreated; watcher.EnableRaisingEvents = true; } private static void WatcherOnCreated(object sender, FileSystemEventArgs fileSystemEventArgs) { Console.WriteLine(fileSystemEventArgs.FullPath + "" + fileSystemEventArgs.ChangeType); // for test purposes var fileInfo = new FileInfo(fileSystemEventArgs.FullPath); FilesWorker.Instance.AddToQueue(fileInfo); } } /// <summary> /// handles the queue of files to be processed and the syncronisation of tasks related to the queue /// </summary> internal class FilesWorker : SingletonBase<FilesWorker> { private FilesWorker() { } /// <summary> /// The queue of files which still need to be processed /// </summary> private readonly ConcurrentQueue<FileInfo> _filesQueue = new ConcurrentQueue<FileInfo>(); /// <summary> /// create a semaphore to limit the number of threads which can process a file at any given time // In this case only allow 2 to be processed at any given time /// </summary> private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(2, 2); /// <summary> /// add new file to the queue /// </summary> /// <param name="fileInfo"></param> internal void AddToQueue(FileInfo fileInfo) { _filesQueue.Enqueue(fileInfo); } /// <summary> /// executes a method on a given timeframe /// </summary> /// <param name="method">method to execute</param> /// <param name="timer">time between execution runs (seconds)</param> internal void ExecuteMethod(Action method, double timer) { IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(timer)); // Token for cancelation var source = new CancellationTokenSource(); observable.Subscribe(x => { var task = new Task(method); task.Start(); }, source.Token); } /// <summary> /// Get any new files and send for processing /// </summary> internal void ReadQueue() { // check the queue every two seconds ExecuteMethod(ProcessQueue, 2d); } /// <summary> /// takes files from the queue and starts processing /// </summary> internal void ProcessQueue() { try { Semaphore.Wait(); FileInfo fileInfo; while (_filesQueue.TryDequeue(out fileInfo)) { var fileProcessor = new FileProcessor(); fileProcessor.ProcessFile(fileInfo); } } finally { Semaphore.Release(); } } } internal class FileProcessor { internal void ProcessFile(FileInfo fileInfo) { // do some long running tasks with the file } } /// <summary> /// Implements singleton pattern on all classes which derive from it /// </summary> /// <typeparam name="T">Derived class</typeparam> public abstract class SingletonBase<T> where T : class { public static T Instance { get { return SingletonFactory.Instance; } } /// <summary> /// The singleton class factory to create the singleton instance. /// </summary> private class SingletonFactory { static SingletonFactory() { } private SingletonFactory() { } internal static readonly T Instance = GetInstance(); private static T GetInstance() { var theType = typeof(T); T inst; try { inst = (T)theType .InvokeMember(theType.Name, BindingFlags.CreateInstance | BindingFlags.Instance | BindingFlags.NonPublic, null, null, null, CultureInfo.InvariantCulture); } catch (MissingMethodException ex) { var exception = new TypeLoadException(string.Format( CultureInfo.CurrentCulture, "The type '{0}' must have a private constructor to " + "be used in the Singleton pattern.", theType.FullName) , ex); //LogManager.LogException(LogManager.EventIdInternal, exception, "error in instantiating the singleton"); throw exception; } return inst; } } } }