使用 Timed webhook 规划架构

Planning architecture with Timed webhook

背景

我有一个用 C# + ASP.NET 核心 (v2.2) 编写的网站,并公开了这个 API:

POST /api/account/signup
POST /api/account/send-greeting

我的业务策略是在注册后恰好 15 分钟向用户发送问候语 (POST /api/account/send-greeting)。

问题

所以,我需要以某种方式注册这个新活动。我虽然有 2 个选项:

  1. 运行 每 1 分钟一个后台任务,查询数据库中的新用户 谁有资格收到这封电子邮件。
  2. 使用分布式队列。就像 Azure 存储队列一样。使用此队列,您可以将可见性超时的消息加入队列。所以你可以定义你现在发送消息到队列,但它只会在 15 分钟后出现在那里。然后您必须部署一个后台服务,该服务将等待队列中的新活动消息并执行它们。

这两个解决方案有明显的缺点:

  1. 解决方案 #1 是天真的解决方案。它消耗了大量的数据库资源,因为它应该每 1 分钟 运行 并查询 table 上的所有注册用户。这效率不高,因为一天中的大部分时间我都没有新注册用户。
  2. 解决方案 #2 太麻烦了。您需要使用队列并部署后台服务才能完成此操作。听起来工作量太大了。

这个任务对我来说听起来很明显。更好的解决方案,我不确定是否存在,可以是: 您向他发送消息的外部服务

POST /api/register-to-timed-callback?when=15m&target-url=http://example.com/api/account/send-greeting

问题

我错过了什么?如何以最简单有效的方式解决这个问题?

您可以根据IHostedService创建排队后台服务。然后,当用户注册并通过后台服务处理该队列时,您将一个项目添加到队列中。当您从队列中取出项目时,您会根据时间检查它是否已准备好发送。如果是这样,您将点击 send-greeting 端点,否则,您将重新排队该项目。 docs 提供了此类服务的示例。

队列:

public interface IBackgroundTaskQueue
{
    void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);

    Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private ConcurrentQueue<Func<CancellationToken, Task>> _workItems = 
        new ConcurrentQueue<Func<CancellationToken, Task>>();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void QueueBackgroundWorkItem(
        Func<CancellationToken, Task> workItem)
    {
        if (workItem == null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        _workItems.Enqueue(workItem);
        _signal.Release();
    }

    public async Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);
        _workItems.TryDequeue(out var workItem);

        return workItem;
    }
}

以及托管服务:

public class QueuedHostedService : BackgroundService
{
    private readonly ILogger _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
        ILoggerFactory loggerFactory)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected async override Task ExecuteAsync(
        CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is starting.");

        while (!cancellationToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(cancellationToken);

            try
            {
                await workItem(cancellationToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, 
                   $"Error occurred executing {nameof(workItem)}.");
            }
        }

        _logger.LogInformation("Queued Hosted Service is stopping.");
    }
}

该代码直接来自文档。它主要支持您的用例,但需要进行一些调整才能让您一路走来。首先,因为有一个时间组件(即你只想处理队列中的项目,如果它是 15 分钟前的),你需要制作 ConcurrentQueue<T> 的类型参数,你可以同时编码日期时间和进入。这可以是 ValueTuple 或专门为此目的创建的实际对象:由您决定。例如:

ConcurrentQueue<(DateTimeOffset added, Func<CancellationToken, Task> task)>

然后,如果没有经过足够的时间,您将需要修改出队逻辑以将其重新入队:

public async Task<Func<CancellationToken, Task>> DequeueAsync(
    CancellationToken cancellationToken)
{
    await _signal.WaitAsync(cancellationToken);
    _workItems.TryDequeue(out var workItem);
    if (DateTimeOffset.UtcNow.AddMinutes(-15) < workItem.added)
    {
        _workItems.Enqueue(workItem);
        return ct => ct.IsCancellationRequested ? Task.FromCanceled(ct) : Task.CompletedTask;;
    }
    return workItem;
}

那里的 return 基本上只是一个满足约束的虚拟 lambda。您可能 return 类似 null 的东西,但是您还需要修改后台服务的 ExecuteAsync 方法以在处理函数之前添加空检查。

还值得注意的是,示例代码被设计为通用的,允许您将 任何 排队等待处理。因此,由于需要进行特定于时间的更改,您应该使用更具体的命名:例如 ITimedBackgroundTaskQueueTimedBackgroundTaskQueueTimedQueuedHostedService。鉴于文档中的示例 interfaces/classes 实际上将集成到 ASP.NET Core 3.0.

中,这一点尤其正确。