如何在 Windows 服务中实现连续的生产者-消费者模式

How to implement a continuous producer-consumer pattern inside a Windows Service

这是我正在尝试做的事情:

我的服务有一个构造函数,例如

public MyService()
{
    Ticker.Elapsed += FillQueue;
}

我在服务启动时启动该计时器,如

protected override void OnStart(string[] args)
{
    Ticker.Enabled = true;
    Task.Run(() => { ConsumeWork(); });
}

我的FillQueue就像

private static async void FillQueue(object source, ElapsedEventArgs e)   
{
    var items = GetUnprocessedItemsFromDb();
    foreach(var item in items)
    {
        if(!Work.Contains(item))
        {
            Work.Enqueue(item);
        }   
    }
}

我的 ConsumeWork 就像

private static void ConsumeWork()
{
    while(true)
    {
        if(Work.Count > 0)
        {
            var item = Work.Peek();
            Process(item);
            Work.Dequeue();
        }
        else
        {
            Thread.Sleep(500);
        }
    }
}

然而,这可能是一个幼稚的实现,我想知道 .NET 是否有任何类型的 class 这正是我在这种情况下所需要的。

您可以使用 ActionBlock 进行处理,它有一个内置队列,您可以 post 使用它。您可以在此处阅读 tpl-dataflow:Intro to TPL-Dataflow also Introduction to Dataflow, Part 1。最后,这是一个让你开始的快速示例。我遗漏了很多,但它至少应该让你开始。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace MyWorkProcessor {

    public class WorkProcessor {

        public WorkProcessor() {
            Processor = CreatePipeline();
        }    

        public async Task StartProcessing() {
            try {
                await Task.Run(() => GetWorkFromDatabase());
            } catch (OperationCanceledException) {
                //handle cancel
            }
        }

        private CancellationTokenSource cts {
            get;
            set;
        }

        private ITargetBlock<WorkItem> Processor {
            get;
        }

        private TimeSpan DatabasePollingFrequency {
            get;
        } = TimeSpan.FromSeconds(5);

        private ITargetBlock<WorkItem> CreatePipeline() {
            var options = new ExecutionDataflowBlockOptions() {
                BoundedCapacity = 100,
                CancellationToken = cts.Token
            };
            return new ActionBlock<WorkItem>(item => ProcessWork(item), options);
        }

        private async Task GetWorkFromDatabase() {
            while (!cts.IsCancellationRequested) {
                var work = await GetWork();
                await Processor.SendAsync(work);
                await Task.Delay(DatabasePollingFrequency);
            }
        }

        private async Task<WorkItem> GetWork() {
            return await Context.GetWork();
        }

        private void ProcessWork(WorkItem item) {
            //do processing
        }
    }
}

尽管@JSteward 的回答是一个好的开始,您可以使用 mixing up the TPL-Dataflow and Rx.NET extensions, as a dataflow block may easily become an observer for your data, and with Rx Timer it will be much less effort for you (Rx.Timer explanation).

来改进它

我们可以根据您的需要调整MSDN article,像这样:

private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;

var source =
    // sequence of Int64 numbers, starting from 0
    // https://msdn.microsoft.com/en-us/library/hh229435.aspx
    Observable.Timer(
        // fire first event after 1 minute waiting
        TimeSpan.FromSeconds(DueIntervalInSeconds),
        // fire all next events each 5 seconds
        TimeSpan.FromSeconds(EventIntervalInSeconds))
    // each number will have a timestamp
    .Timestamp()
    // each time we select some items to process
    .SelectMany(GetItemsFromDB)
    // filter already added
    .Where(i => !_processedItemIds.Contains(i.Id));

var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
    {
        // we can start as many item processing as processor count
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

IDisposable subscription = source.Subscribe(action.AsObserver());

此外,您对已处理项目的检查不太准确,因为有可能项目在您完成处理时从数据库中被选为未处理,但没有更新它在数据库中。在这种情况下,项目将从 Queue<T> 中删除,然后由生产者再次添加到那里,这就是我将 ConcurrentBag<T> 添加到此解决方案的原因(HashSet<T> 不是线程-安全):

private static async Task ProcessItem(Item item)
{
    if (_processedItemIds.Contains(item.Id))
    {
        return;
    }

    _processedItemIds.Add(item.Id);
    // actual work here

    // save item as processed in database

    // we need to wait to ensure item not to appear in queue again 
    await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));

    // clear the processed cache to reduce memory usage
    _processedItemIds.Remove(item.Id);
}

public class Item
{
    public Guid Id { get; set; }
}

// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();

private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
    // log event timing
    Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");

    // return items from DB
    return new[] { new Item { Id = Guid.NewGuid() } };
}

您可以通过其他方式实现缓存清理,例如,启动一个 "GC" 计时器,它会定期从缓存中删除已处理的项目。

要停止事件和处理项目,您应该 Dispose 订阅,也许 Complete ActionBlock:

subscription.Dispose();
action.Complete();

您可以在他们的 guidelines on github 中找到有关 Rx.Net 的更多信息。