如何在 C# 中创建一个 "fire & forget" 异步 FIFO 队列?
How to make a "fire & forget" async FIFO queue in c#?
我正在尝试异步处理文档。这个想法是用户将文档发送到服务,这需要时间,稍后会查看结果(每个文档大约 20-90 秒)。
理想情况下,我只想填充某种可以被系统尽快清空的可观察集合。当有一个项目时,处理它并在另一个对象中产生预期的输出,当没有项目时什么也不做。当用户检查输出集合时,他会找到已经处理过的项目。
理想情况下,所有项目从一开始就可见,并且会有一个状态(已完成、正在进行或在队列中),但一旦我知道如何做第一个,我应该能够处理这些状态。
我不确定要使用哪个对象,现在我正在查看 BlockingCollection
,但我认为它不适合这项工作,因为我不能在它的时候填充它从另一端清空。
private BlockingCollection<IDocument> _jobs = new BlockingCollection<IDocument>();
public ObservableCollection<IExtractedDocument> ExtractedDocuments { get; }
public QueueService()
{
ExtractedDocuments = new ObservableCollection<IExtractedDocument>();
}
public async Task Add(string filePath, List<Extra> extras)
{
if (_jobs.IsAddingCompleted || _jobs.IsCompleted)
_jobs = new BlockingCollection<IDocument>();
var doc = new Document(filePath, extras);
_jobs.Add(doc);
_jobs.CompleteAdding();
await ProcessQueue();
}
private async Task ProcessQueue()
{
foreach (var document in _jobs.GetConsumingEnumerable(CancellationToken.None))
{
var resultDocument = await service.ProcessDocument(document);
ExtractedDocuments.Add(resultDocument );
Debug.WriteLine("Job completed");
}
}
这就是我现在的处理方式。如果我删除 CompleteAdding
调用,它会在第二次尝试时挂起。如果我有那个语句,那么我不能只填充队列,我必须先清空它,这违背了目的。
有什么方法可以实现我想要实现的目标吗?一个我将填充的集合,系统将异步和自主地处理?
总而言之,我需要:
- 一个我可以填充的集合,它将被逐步异步处理。可以在处理某些文档或系列或文档时添加它们。
- 处理完成后将填充的输出集合
- UI 线程和应用程序在一切正常时仍然响应 运行
- 我不需要并行处理多个进程,或一次处理一个文档。无论哪个最容易安装和维护都可以(小规模应用)。我假设一次一个更简单。
这里的一个常见模式是有一个在文档状态更改时执行的回调方法。使用后台任务 运行,它会尽可能快地咀嚼扔掉的文件。调用 Dispose 关闭处理器。
如果您需要在 gui 线程上处理回调,则需要以某种方式将回调同步到您的主线程。 Windows 如果您使用的是表单,则表单有执行此操作的方法。
此示例程序实现了所有必要的 类 和接口,您可以根据需要进行微调。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp2
{
class Program
{
private static Task Callback(IExtractedDocument doc, DocumentProcessor.DocState docState)
{
Console.WriteLine("Processing doc {0}, state: {1}", doc, docState);
return Task.CompletedTask;
}
public static void Main()
{
using DocumentProcessor docProcessor = new DocumentProcessor(Callback);
Console.WriteLine("Processor started, press any key to end processing");
for (int i = 0; i < 100; i++)
{
if (Console.KeyAvailable)
{
break;
}
else if (i == 5)
{
// make an error
docProcessor.Add(null);
}
else
{
docProcessor.Add(new Document { Text = "Test text " + Guid.NewGuid().ToString() });
}
Thread.Sleep(500);
}
Console.WriteLine("Doc processor shut down, press ENTER to quit");
Console.ReadLine();
}
public interface IDocument
{
public string Text { get; }
}
public class Document : IDocument
{
public string Text { get; set; }
}
public interface IExtractedDocument : IDocument
{
public IDocument OriginalDocument { get; }
public Exception Error { get; }
}
public class ExtractedDocument : IExtractedDocument
{
public override string ToString()
{
return $"Orig text: {OriginalDocument?.Text}, Extracted Text: {Text}, Error: {Error}";
}
public IDocument OriginalDocument { get; set; }
public string Text { get; set; }
public Exception Error { get; set; }
}
public class DocumentProcessor : IDisposable
{
public enum DocState { Processing, Completed, Error }
private readonly BlockingCollection<IDocument> queue = new BlockingCollection<IDocument>();
private readonly Func<IExtractedDocument, DocState, Task> callback;
private CancellationTokenSource cancelToken = new CancellationTokenSource();
public DocumentProcessor(Func<IExtractedDocument, DocState, Task> callback)
{
this.callback = callback;
Task.Run(() => StartQueueProcessor()).GetAwaiter();
}
public void Dispose()
{
if (!cancelToken.IsCancellationRequested)
{
cancelToken.Cancel();
}
}
public void Add(IDocument doc)
{
if (cancelToken.IsCancellationRequested)
{
throw new InvalidOperationException("Processor is disposed");
}
queue.Add(doc);
}
private void ProcessDocument(IDocument doc)
{
try
{
// do processing
DoCallback(new ExtractedDocument { OriginalDocument = doc }, DocState.Processing);
if (doc is null)
{
throw new ArgumentNullException("Document to process was null");
}
IExtractedDocument successExtractedDocument = DoSomeDocumentProcessing(doc);
DoCallback(successExtractedDocument, DocState.Completed);
}
catch (Exception ex)
{
DoCallback(new ExtractedDocument { OriginalDocument = doc, Error = ex }, DocState.Error);
}
}
private IExtractedDocument DoSomeDocumentProcessing(IDocument originalDocument)
{
return new ExtractedDocument { OriginalDocument = originalDocument, Text = "Extracted: " + originalDocument.Text };
}
private void DoCallback(IExtractedDocument result, DocState docState)
{
if (callback != null)
{
// send callbacks in background
callback(result, docState).GetAwaiter();
}
}
private void StartQueueProcessor()
{
try
{
while (!cancelToken.Token.IsCancellationRequested)
{
if (queue.TryTake(out IDocument doc, 1000, cancelToken.Token))
{
// can chance to Task.Run(() => ProcessDocument(doc)).GetAwaiter() for parallel execution
ProcessDocument(doc);
}
}
}
catch (OperationCanceledException)
{
// ignore, don't need to throw or worry about this
}
while (queue.TryTake(out IDocument doc))
{
DoCallback(new ExtractedDocument { Error = new ObjectDisposedException("Processor was disposed") }, DocState.Error);
}
}
}
}
}
我正在尝试异步处理文档。这个想法是用户将文档发送到服务,这需要时间,稍后会查看结果(每个文档大约 20-90 秒)。
理想情况下,我只想填充某种可以被系统尽快清空的可观察集合。当有一个项目时,处理它并在另一个对象中产生预期的输出,当没有项目时什么也不做。当用户检查输出集合时,他会找到已经处理过的项目。
理想情况下,所有项目从一开始就可见,并且会有一个状态(已完成、正在进行或在队列中),但一旦我知道如何做第一个,我应该能够处理这些状态。
我不确定要使用哪个对象,现在我正在查看 BlockingCollection
,但我认为它不适合这项工作,因为我不能在它的时候填充它从另一端清空。
private BlockingCollection<IDocument> _jobs = new BlockingCollection<IDocument>();
public ObservableCollection<IExtractedDocument> ExtractedDocuments { get; }
public QueueService()
{
ExtractedDocuments = new ObservableCollection<IExtractedDocument>();
}
public async Task Add(string filePath, List<Extra> extras)
{
if (_jobs.IsAddingCompleted || _jobs.IsCompleted)
_jobs = new BlockingCollection<IDocument>();
var doc = new Document(filePath, extras);
_jobs.Add(doc);
_jobs.CompleteAdding();
await ProcessQueue();
}
private async Task ProcessQueue()
{
foreach (var document in _jobs.GetConsumingEnumerable(CancellationToken.None))
{
var resultDocument = await service.ProcessDocument(document);
ExtractedDocuments.Add(resultDocument );
Debug.WriteLine("Job completed");
}
}
这就是我现在的处理方式。如果我删除 CompleteAdding
调用,它会在第二次尝试时挂起。如果我有那个语句,那么我不能只填充队列,我必须先清空它,这违背了目的。
有什么方法可以实现我想要实现的目标吗?一个我将填充的集合,系统将异步和自主地处理?
总而言之,我需要:
- 一个我可以填充的集合,它将被逐步异步处理。可以在处理某些文档或系列或文档时添加它们。
- 处理完成后将填充的输出集合
- UI 线程和应用程序在一切正常时仍然响应 运行
- 我不需要并行处理多个进程,或一次处理一个文档。无论哪个最容易安装和维护都可以(小规模应用)。我假设一次一个更简单。
这里的一个常见模式是有一个在文档状态更改时执行的回调方法。使用后台任务 运行,它会尽可能快地咀嚼扔掉的文件。调用 Dispose 关闭处理器。
如果您需要在 gui 线程上处理回调,则需要以某种方式将回调同步到您的主线程。 Windows 如果您使用的是表单,则表单有执行此操作的方法。
此示例程序实现了所有必要的 类 和接口,您可以根据需要进行微调。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp2
{
class Program
{
private static Task Callback(IExtractedDocument doc, DocumentProcessor.DocState docState)
{
Console.WriteLine("Processing doc {0}, state: {1}", doc, docState);
return Task.CompletedTask;
}
public static void Main()
{
using DocumentProcessor docProcessor = new DocumentProcessor(Callback);
Console.WriteLine("Processor started, press any key to end processing");
for (int i = 0; i < 100; i++)
{
if (Console.KeyAvailable)
{
break;
}
else if (i == 5)
{
// make an error
docProcessor.Add(null);
}
else
{
docProcessor.Add(new Document { Text = "Test text " + Guid.NewGuid().ToString() });
}
Thread.Sleep(500);
}
Console.WriteLine("Doc processor shut down, press ENTER to quit");
Console.ReadLine();
}
public interface IDocument
{
public string Text { get; }
}
public class Document : IDocument
{
public string Text { get; set; }
}
public interface IExtractedDocument : IDocument
{
public IDocument OriginalDocument { get; }
public Exception Error { get; }
}
public class ExtractedDocument : IExtractedDocument
{
public override string ToString()
{
return $"Orig text: {OriginalDocument?.Text}, Extracted Text: {Text}, Error: {Error}";
}
public IDocument OriginalDocument { get; set; }
public string Text { get; set; }
public Exception Error { get; set; }
}
public class DocumentProcessor : IDisposable
{
public enum DocState { Processing, Completed, Error }
private readonly BlockingCollection<IDocument> queue = new BlockingCollection<IDocument>();
private readonly Func<IExtractedDocument, DocState, Task> callback;
private CancellationTokenSource cancelToken = new CancellationTokenSource();
public DocumentProcessor(Func<IExtractedDocument, DocState, Task> callback)
{
this.callback = callback;
Task.Run(() => StartQueueProcessor()).GetAwaiter();
}
public void Dispose()
{
if (!cancelToken.IsCancellationRequested)
{
cancelToken.Cancel();
}
}
public void Add(IDocument doc)
{
if (cancelToken.IsCancellationRequested)
{
throw new InvalidOperationException("Processor is disposed");
}
queue.Add(doc);
}
private void ProcessDocument(IDocument doc)
{
try
{
// do processing
DoCallback(new ExtractedDocument { OriginalDocument = doc }, DocState.Processing);
if (doc is null)
{
throw new ArgumentNullException("Document to process was null");
}
IExtractedDocument successExtractedDocument = DoSomeDocumentProcessing(doc);
DoCallback(successExtractedDocument, DocState.Completed);
}
catch (Exception ex)
{
DoCallback(new ExtractedDocument { OriginalDocument = doc, Error = ex }, DocState.Error);
}
}
private IExtractedDocument DoSomeDocumentProcessing(IDocument originalDocument)
{
return new ExtractedDocument { OriginalDocument = originalDocument, Text = "Extracted: " + originalDocument.Text };
}
private void DoCallback(IExtractedDocument result, DocState docState)
{
if (callback != null)
{
// send callbacks in background
callback(result, docState).GetAwaiter();
}
}
private void StartQueueProcessor()
{
try
{
while (!cancelToken.Token.IsCancellationRequested)
{
if (queue.TryTake(out IDocument doc, 1000, cancelToken.Token))
{
// can chance to Task.Run(() => ProcessDocument(doc)).GetAwaiter() for parallel execution
ProcessDocument(doc);
}
}
}
catch (OperationCanceledException)
{
// ignore, don't need to throw or worry about this
}
while (queue.TryTake(out IDocument doc))
{
DoCallback(new ExtractedDocument { Error = new ObjectDisposedException("Processor was disposed") }, DocState.Error);
}
}
}
}
}