如何在 Windows 服务中实现连续的生产者-消费者模式
How to implement a continuous producer-consumer pattern inside a Windows Service
这是我正在尝试做的事情:
- 在内存中保留需要处理的项目的队列(即
IsProcessed = 0
)
- 每 5 秒从数据库中获取未处理的项目,如果它们不在队列中,则添加它们
- 不断从队列中拉取项目,处理它们,每次处理一个项目时,在数据库中更新它(
IsProcessed = 1
)
- 全部搞定"as parallel as possible"
我的服务有一个构造函数,例如
public MyService()
{
Ticker.Elapsed += FillQueue;
}
我在服务启动时启动该计时器,如
protected override void OnStart(string[] args)
{
Ticker.Enabled = true;
Task.Run(() => { ConsumeWork(); });
}
我的FillQueue
就像
private static async void FillQueue(object source, ElapsedEventArgs e)
{
var items = GetUnprocessedItemsFromDb();
foreach(var item in items)
{
if(!Work.Contains(item))
{
Work.Enqueue(item);
}
}
}
我的 ConsumeWork
就像
private static void ConsumeWork()
{
while(true)
{
if(Work.Count > 0)
{
var item = Work.Peek();
Process(item);
Work.Dequeue();
}
else
{
Thread.Sleep(500);
}
}
}
然而,这可能是一个幼稚的实现,我想知道 .NET 是否有任何类型的 class 这正是我在这种情况下所需要的。
您可以使用 ActionBlock
进行处理,它有一个内置队列,您可以 post 使用它。您可以在此处阅读 tpl-dataflow:Intro to TPL-Dataflow also Introduction to Dataflow, Part 1。最后,这是一个让你开始的快速示例。我遗漏了很多,但它至少应该让你开始。
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace MyWorkProcessor {
public class WorkProcessor {
public WorkProcessor() {
Processor = CreatePipeline();
}
public async Task StartProcessing() {
try {
await Task.Run(() => GetWorkFromDatabase());
} catch (OperationCanceledException) {
//handle cancel
}
}
private CancellationTokenSource cts {
get;
set;
}
private ITargetBlock<WorkItem> Processor {
get;
}
private TimeSpan DatabasePollingFrequency {
get;
} = TimeSpan.FromSeconds(5);
private ITargetBlock<WorkItem> CreatePipeline() {
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 100,
CancellationToken = cts.Token
};
return new ActionBlock<WorkItem>(item => ProcessWork(item), options);
}
private async Task GetWorkFromDatabase() {
while (!cts.IsCancellationRequested) {
var work = await GetWork();
await Processor.SendAsync(work);
await Task.Delay(DatabasePollingFrequency);
}
}
private async Task<WorkItem> GetWork() {
return await Context.GetWork();
}
private void ProcessWork(WorkItem item) {
//do processing
}
}
}
尽管@JSteward 的回答是一个好的开始,您可以使用 mixing up the TPL-Dataflow
and Rx.NET
extensions, as a dataflow block may easily become an observer for your data, and with Rx
Timer it will be much less effort for you (Rx.Timer
explanation).
来改进它
我们可以根据您的需要调整MSDN article,像这样:
private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;
var source =
// sequence of Int64 numbers, starting from 0
// https://msdn.microsoft.com/en-us/library/hh229435.aspx
Observable.Timer(
// fire first event after 1 minute waiting
TimeSpan.FromSeconds(DueIntervalInSeconds),
// fire all next events each 5 seconds
TimeSpan.FromSeconds(EventIntervalInSeconds))
// each number will have a timestamp
.Timestamp()
// each time we select some items to process
.SelectMany(GetItemsFromDB)
// filter already added
.Where(i => !_processedItemIds.Contains(i.Id));
var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
{
// we can start as many item processing as processor count
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
IDisposable subscription = source.Subscribe(action.AsObserver());
此外,您对已处理项目的检查不太准确,因为有可能项目在您完成处理时从数据库中被选为未处理,但没有更新它在数据库中。在这种情况下,项目将从 Queue<T>
中删除,然后由生产者再次添加到那里,这就是我将 ConcurrentBag<T>
添加到此解决方案的原因(HashSet<T>
不是线程-安全):
private static async Task ProcessItem(Item item)
{
if (_processedItemIds.Contains(item.Id))
{
return;
}
_processedItemIds.Add(item.Id);
// actual work here
// save item as processed in database
// we need to wait to ensure item not to appear in queue again
await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));
// clear the processed cache to reduce memory usage
_processedItemIds.Remove(item.Id);
}
public class Item
{
public Guid Id { get; set; }
}
// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();
private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
// log event timing
Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");
// return items from DB
return new[] { new Item { Id = Guid.NewGuid() } };
}
您可以通过其他方式实现缓存清理,例如,启动一个 "GC" 计时器,它会定期从缓存中删除已处理的项目。
要停止事件和处理项目,您应该 Dispose
订阅,也许 Complete
ActionBlock
:
subscription.Dispose();
action.Complete();
您可以在他们的 guidelines on github 中找到有关 Rx.Net
的更多信息。
这是我正在尝试做的事情:
- 在内存中保留需要处理的项目的队列(即
IsProcessed = 0
) - 每 5 秒从数据库中获取未处理的项目,如果它们不在队列中,则添加它们
- 不断从队列中拉取项目,处理它们,每次处理一个项目时,在数据库中更新它(
IsProcessed = 1
) - 全部搞定"as parallel as possible"
我的服务有一个构造函数,例如
public MyService()
{
Ticker.Elapsed += FillQueue;
}
我在服务启动时启动该计时器,如
protected override void OnStart(string[] args)
{
Ticker.Enabled = true;
Task.Run(() => { ConsumeWork(); });
}
我的FillQueue
就像
private static async void FillQueue(object source, ElapsedEventArgs e)
{
var items = GetUnprocessedItemsFromDb();
foreach(var item in items)
{
if(!Work.Contains(item))
{
Work.Enqueue(item);
}
}
}
我的 ConsumeWork
就像
private static void ConsumeWork()
{
while(true)
{
if(Work.Count > 0)
{
var item = Work.Peek();
Process(item);
Work.Dequeue();
}
else
{
Thread.Sleep(500);
}
}
}
然而,这可能是一个幼稚的实现,我想知道 .NET 是否有任何类型的 class 这正是我在这种情况下所需要的。
您可以使用 ActionBlock
进行处理,它有一个内置队列,您可以 post 使用它。您可以在此处阅读 tpl-dataflow:Intro to TPL-Dataflow also Introduction to Dataflow, Part 1。最后,这是一个让你开始的快速示例。我遗漏了很多,但它至少应该让你开始。
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace MyWorkProcessor {
public class WorkProcessor {
public WorkProcessor() {
Processor = CreatePipeline();
}
public async Task StartProcessing() {
try {
await Task.Run(() => GetWorkFromDatabase());
} catch (OperationCanceledException) {
//handle cancel
}
}
private CancellationTokenSource cts {
get;
set;
}
private ITargetBlock<WorkItem> Processor {
get;
}
private TimeSpan DatabasePollingFrequency {
get;
} = TimeSpan.FromSeconds(5);
private ITargetBlock<WorkItem> CreatePipeline() {
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 100,
CancellationToken = cts.Token
};
return new ActionBlock<WorkItem>(item => ProcessWork(item), options);
}
private async Task GetWorkFromDatabase() {
while (!cts.IsCancellationRequested) {
var work = await GetWork();
await Processor.SendAsync(work);
await Task.Delay(DatabasePollingFrequency);
}
}
private async Task<WorkItem> GetWork() {
return await Context.GetWork();
}
private void ProcessWork(WorkItem item) {
//do processing
}
}
}
尽管@JSteward 的回答是一个好的开始,您可以使用 mixing up the TPL-Dataflow
and Rx.NET
extensions, as a dataflow block may easily become an observer for your data, and with Rx
Timer it will be much less effort for you (Rx.Timer
explanation).
我们可以根据您的需要调整MSDN article,像这样:
private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;
var source =
// sequence of Int64 numbers, starting from 0
// https://msdn.microsoft.com/en-us/library/hh229435.aspx
Observable.Timer(
// fire first event after 1 minute waiting
TimeSpan.FromSeconds(DueIntervalInSeconds),
// fire all next events each 5 seconds
TimeSpan.FromSeconds(EventIntervalInSeconds))
// each number will have a timestamp
.Timestamp()
// each time we select some items to process
.SelectMany(GetItemsFromDB)
// filter already added
.Where(i => !_processedItemIds.Contains(i.Id));
var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
{
// we can start as many item processing as processor count
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
IDisposable subscription = source.Subscribe(action.AsObserver());
此外,您对已处理项目的检查不太准确,因为有可能项目在您完成处理时从数据库中被选为未处理,但没有更新它在数据库中。在这种情况下,项目将从 Queue<T>
中删除,然后由生产者再次添加到那里,这就是我将 ConcurrentBag<T>
添加到此解决方案的原因(HashSet<T>
不是线程-安全):
private static async Task ProcessItem(Item item)
{
if (_processedItemIds.Contains(item.Id))
{
return;
}
_processedItemIds.Add(item.Id);
// actual work here
// save item as processed in database
// we need to wait to ensure item not to appear in queue again
await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));
// clear the processed cache to reduce memory usage
_processedItemIds.Remove(item.Id);
}
public class Item
{
public Guid Id { get; set; }
}
// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();
private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
// log event timing
Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");
// return items from DB
return new[] { new Item { Id = Guid.NewGuid() } };
}
您可以通过其他方式实现缓存清理,例如,启动一个 "GC" 计时器,它会定期从缓存中删除已处理的项目。
要停止事件和处理项目,您应该 Dispose
订阅,也许 Complete
ActionBlock
:
subscription.Dispose();
action.Complete();
您可以在他们的 guidelines on github 中找到有关 Rx.Net
的更多信息。