运行 C# 中的多个任务

Running Multiple Tasks in C#

我正在开发一个具有一些非实时功能的实时软件。

规格:

我应该使用多线程吗?任务?

现在我正在使用定时器,但我认为这不是一个很好的用途。

当文件出现在文件夹中时立即开始文件处理通常不是一个好主意。例如,如果 100 个文件同时到达,那么您最终将有 100 个线程都在竞争相同的物理资源,例如磁头等。在我看来,最好的方法是实时,或者至少尽可能实时使用 .Net 通知新文件存在,然后使用生产者-消费者队列处理这些文件

最好使用线程安全的集合,例如ConcurrentQueue with the number of active long running, e.g. emailing threads in process at any time controlled by a synchronization mechanism such as SemaphoreSlim

使用 FileSystemWatcher class. Execution of processing in a time based manner should, ideally, use the Observer Pattern

可以轻松实现目录中新创建文件的通知

我在下面创建了一个简单的应用程序来演示这些应该有帮助的概念。

该应用程序包含多个键 classes

  1. SingletonBase class 为 classes 实现了 Singleton Pattern,应该只实例化一次,例如文件系统观察者 class
  2. FileSystemMonitor class 监视正在创建的新文件的目录并立即 通知处理队列有一个新文件文件已存在。 当然,您可以很容易地修改它以开始立即处理文件,但如上所述,这通常不是一个好主意
  3. FilesWorkerclass处理队列访问和相关任务同步。

    using System;
    using System.Collections.Concurrent;
    using System.Globalization;
    using System.Reactive.Linq;
    using System.Reflection;
    using System.Threading;
    using System.Threading.Tasks;
    using System.IO;
    using System.Security.Permissions;
    
    
    namespace ConsoleApplication9
    {
        internal class Program
        {
    
            private static void Main(string[] args)
            {
    
    
                const string directorytowatch = @"d:\junk\watch\"; // the directory to watch for new files
                // this initiates a filesystemmonitor to watch for new files being created 
                Task.Factory.StartNew(() => FileSystemMonitor.Instance.WatchDirectory(directorytowatch));          
    
                // initiate the processing of any new files
                FilesWorker.Instance.ReadQueue();
                Console.ReadLine();
    
            }
    
    
        }
    
        /// <summary>
        /// Monitors the filesystem in "real-time" to check for new files
        /// </summary>
        [PermissionSet(SecurityAction.Demand, Name = "FullTrust")]
        internal class FileSystemMonitor : SingletonBase<FileSystemMonitor>
        {
            private FileSystemMonitor()
            {
            }
    
            internal void WatchDirectory(string dir)
            {
                var watcher = new FileSystemWatcher(dir)
                {
                    NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite | NotifyFilters.LastAccess,
                    Filter = "*.*"
                };
    
                // watch all files
                watcher.Created += WatcherOnCreated;
                watcher.EnableRaisingEvents = true;
            }
    
            private static void WatcherOnCreated(object sender, FileSystemEventArgs fileSystemEventArgs)
            {
                Console.WriteLine(fileSystemEventArgs.FullPath + "" + fileSystemEventArgs.ChangeType); // for test purposes
                var fileInfo = new FileInfo(fileSystemEventArgs.FullPath);
                FilesWorker.Instance.AddToQueue(fileInfo);
            }
        }
    
        /// <summary>
        /// handles the queue of files to be processed and the syncronisation of tasks related to the queue
        /// </summary>
        internal class FilesWorker : SingletonBase<FilesWorker>
        {
            private FilesWorker()
            {
            }
    
            /// <summary>
            /// The queue of files which still need to be processed
            /// </summary>
            private readonly ConcurrentQueue<FileInfo> _filesQueue = new ConcurrentQueue<FileInfo>();
    
            /// <summary>
            /// create a semaphore to limit the number of threads which can process a file at any given time
            // In this case only allow 2 to be processed at any given time
            /// </summary>
            private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(2, 2);
    
            /// <summary>
            /// add new file to the queue
            /// </summary>
            /// <param name="fileInfo"></param>
            internal void AddToQueue(FileInfo fileInfo)
            {
                _filesQueue.Enqueue(fileInfo);
            }
    
            /// <summary>
            /// executes a method on a given timeframe
            /// </summary>
            /// <param name="method">method to execute</param>
            /// <param name="timer">time between execution runs (seconds)</param>
            internal void ExecuteMethod(Action method, double timer)
            {
                IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(timer));
                // Token for cancelation
                var source = new CancellationTokenSource();
    
                observable.Subscribe(x =>
                {
                    var task = new Task(method);
                    task.Start();
                }, source.Token);
    
            }
    
            /// <summary>
            /// Get any new files and send for processing
            /// </summary>
            internal void ReadQueue()
            {
                // check the queue every two seconds
                ExecuteMethod(ProcessQueue, 2d);            
            }
    
            /// <summary>
            /// takes files from the queue and starts processing
            /// </summary>
            internal void ProcessQueue()
            {
                try
                {
                    Semaphore.Wait();
                    FileInfo fileInfo;
                    while (_filesQueue.TryDequeue(out fileInfo))
                    {
                        var fileProcessor = new FileProcessor();
                        fileProcessor.ProcessFile(fileInfo);
                    }
                }
                finally
                {
                    Semaphore.Release();
                }
            }
    
        }
    
        internal class FileProcessor
        {
            internal void ProcessFile(FileInfo fileInfo)
            {
                // do some long running tasks with the file
            }
        }
    
    
        /// <summary>
        /// Implements singleton pattern on all classes which derive from it
        /// </summary>
        /// <typeparam name="T">Derived class</typeparam>
        public abstract class SingletonBase<T> where T : class
        {
    
            public static T Instance
            {
                get { return SingletonFactory.Instance; }
            }
    
            /// <summary>
            /// The singleton class factory to create the singleton instance.
            /// </summary>
            private class SingletonFactory
            {
    
                static SingletonFactory()
                {
                }
    
                private SingletonFactory()
                {
                }
    
                internal static readonly T Instance = GetInstance();
    
                private static T GetInstance()
                {
                    var theType = typeof(T);
                    T inst;
                    try
                    {
                        inst = (T)theType
                            .InvokeMember(theType.Name,
                                BindingFlags.CreateInstance | BindingFlags.Instance
                                | BindingFlags.NonPublic,
                                null, null, null,
                                CultureInfo.InvariantCulture);
                    }
                    catch (MissingMethodException ex)
                    {
                        var exception = new TypeLoadException(string.Format(
                            CultureInfo.CurrentCulture,
                            "The type '{0}' must have a private constructor to " +
                            "be used in the Singleton pattern.", theType.FullName)
                            , ex);
                        //LogManager.LogException(LogManager.EventIdInternal, exception, "error in instantiating the singleton");
                        throw exception;
                    }
    
                    return inst;
                }
            }
        }
    }