.NET 核心 Web api 带队列处理

.NET core web api with queue processing

如何设置 .NET 核心网站 api

另外,一个不断检查队列并一条一条处理消息的例程。

根据需求,api将作为消息的接收者,一分钟内可能会被点击数百次,而它收到的消息要一条一条处理. 我对 Web api 有点陌生,所以想知道这样的设置是否适合,如果是,如何将不同的组件组合在一起。

提前致谢..

老实说,我认为在一个进程中接收和处理消息没有意义,所以我建议使用像 RabbitMQ or Kafka or any other existing system of your preference, where you can put your messages and another process would consume it. It's quite big topic, you can start from this tutorial

这样的外部消息系统

如果您仍想将它放在一个进程中,这也是可能的,您可以创建一个后台任务队列,将您的消息放在那里并创建 background task,它将从该队列中使用它们。

public interface IBackgroundTaskQueue
{
    void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);

    Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private ConcurrentQueue<Func<CancellationToken, Task>> _workItems = 
        new ConcurrentQueue<Func<CancellationToken, Task>>();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void QueueBackgroundWorkItem(
        Func<CancellationToken, Task> workItem)
    {
        if (workItem == null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        _workItems.Enqueue(workItem);
        _signal.Release();
    }

    public async Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);
        _workItems.TryDequeue(out var workItem);

        return workItem;
    }
}

后台任务:

public class QueuedHostedService : BackgroundService
{
    private readonly ILogger _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
        ILoggerFactory loggerFactory)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected async override Task ExecuteAsync(
        CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is starting.");

        while (!cancellationToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(cancellationToken);

            try
            {
                await workItem(cancellationToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, 
                   $"Error occurred executing {nameof(workItem)}.");
            }
        }

        _logger.LogInformation("Queued Hosted Service is stopping.");
    }
}

报名人数:

public void ConfigureServices(IServiceCollection services)
{
    services.AddHostedService<QueuedHostedService>();
    services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
}

注入控制器:

public class ApiController
{
    private IBackgroundTaskQueue queue;
    public ApiController(IBackgroundTaskQueue queue)
    {
        this.queue = queue;
    }

    public IActionResult StartProcessing()
    {
        queue.QueueBackgroundWorkItem(async token =>
        {
            // put processing code here
        }

        return Ok();
    }
}

您可以修改 BackgroundTaskQueue 以满足您的要求,但我希望您理解这背后的想法。