使用 Timed webhook 规划架构
Planning architecture with Timed webhook
背景
我有一个用 C# + ASP.NET 核心 (v2.2) 编写的网站,并公开了这个 API:
POST /api/account/signup
POST /api/account/send-greeting
我的业务策略是在注册后恰好 15 分钟向用户发送问候语 (POST /api/account/send-greeting
)。
问题
所以,我需要以某种方式注册这个新活动。我虽然有 2 个选项:
- 运行 每 1 分钟一个后台任务,查询数据库中的新用户
谁有资格收到这封电子邮件。
- 使用分布式队列。就像 Azure 存储队列一样。使用此队列,您可以将可见性超时的消息加入队列。所以你可以定义你现在发送消息到队列,但它只会在 15 分钟后出现在那里。然后您必须部署一个后台服务,该服务将等待队列中的新活动消息并执行它们。
这两个解决方案有明显的缺点:
- 解决方案 #1 是天真的解决方案。它消耗了大量的数据库资源,因为它应该每 1 分钟 运行 并查询 table 上的所有注册用户。这效率不高,因为一天中的大部分时间我都没有新注册用户。
- 解决方案 #2 太麻烦了。您需要使用队列并部署后台服务才能完成此操作。听起来工作量太大了。
这个任务对我来说听起来很明显。更好的解决方案,我不确定是否存在,可以是:
您向他发送消息的外部服务
POST /api/register-to-timed-callback?when=15m&target-url=http://example.com/api/account/send-greeting
问题
我错过了什么?如何以最简单有效的方式解决这个问题?
您可以根据IHostedService
创建排队后台服务。然后,当用户注册并通过后台服务处理该队列时,您将一个项目添加到队列中。当您从队列中取出项目时,您会根据时间检查它是否已准备好发送。如果是这样,您将点击 send-greeting
端点,否则,您将重新排队该项目。 docs 提供了此类服务的示例。
队列:
public interface IBackgroundTaskQueue
{
void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
new ConcurrentQueue<Func<CancellationToken, Task>>();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(
Func<CancellationToken, Task> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
return workItem;
}
}
以及托管服务:
public class QueuedHostedService : BackgroundService
{
private readonly ILogger _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILoggerFactory loggerFactory)
{
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
}
public IBackgroundTaskQueue TaskQueue { get; }
protected async override Task ExecuteAsync(
CancellationToken cancellationToken)
{
_logger.LogInformation("Queued Hosted Service is starting.");
while (!cancellationToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try
{
await workItem(cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
$"Error occurred executing {nameof(workItem)}.");
}
}
_logger.LogInformation("Queued Hosted Service is stopping.");
}
}
该代码直接来自文档。它主要支持您的用例,但需要进行一些调整才能让您一路走来。首先,因为有一个时间组件(即你只想处理队列中的项目,如果它是 15 分钟前的),你需要制作 ConcurrentQueue<T>
的类型参数,你可以同时编码日期时间和进入。这可以是 ValueTuple
或专门为此目的创建的实际对象:由您决定。例如:
ConcurrentQueue<(DateTimeOffset added, Func<CancellationToken, Task> task)>
然后,如果没有经过足够的时间,您将需要修改出队逻辑以将其重新入队:
public async Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
if (DateTimeOffset.UtcNow.AddMinutes(-15) < workItem.added)
{
_workItems.Enqueue(workItem);
return ct => ct.IsCancellationRequested ? Task.FromCanceled(ct) : Task.CompletedTask;;
}
return workItem;
}
那里的 return 基本上只是一个满足约束的虚拟 lambda。您可能 return 类似 null 的东西,但是您还需要修改后台服务的 ExecuteAsync
方法以在处理函数之前添加空检查。
还值得注意的是,示例代码被设计为通用的,允许您将 任何 排队等待处理。因此,由于需要进行特定于时间的更改,您应该使用更具体的命名:例如 ITimedBackgroundTaskQueue
、TimedBackgroundTaskQueue
和 TimedQueuedHostedService
。鉴于文档中的示例 interfaces/classes 实际上将集成到 ASP.NET Core 3.0.
中,这一点尤其正确。
背景
我有一个用 C# + ASP.NET 核心 (v2.2) 编写的网站,并公开了这个 API:
POST /api/account/signup
POST /api/account/send-greeting
我的业务策略是在注册后恰好 15 分钟向用户发送问候语 (POST /api/account/send-greeting
)。
问题
所以,我需要以某种方式注册这个新活动。我虽然有 2 个选项:
- 运行 每 1 分钟一个后台任务,查询数据库中的新用户 谁有资格收到这封电子邮件。
- 使用分布式队列。就像 Azure 存储队列一样。使用此队列,您可以将可见性超时的消息加入队列。所以你可以定义你现在发送消息到队列,但它只会在 15 分钟后出现在那里。然后您必须部署一个后台服务,该服务将等待队列中的新活动消息并执行它们。
这两个解决方案有明显的缺点:
- 解决方案 #1 是天真的解决方案。它消耗了大量的数据库资源,因为它应该每 1 分钟 运行 并查询 table 上的所有注册用户。这效率不高,因为一天中的大部分时间我都没有新注册用户。
- 解决方案 #2 太麻烦了。您需要使用队列并部署后台服务才能完成此操作。听起来工作量太大了。
这个任务对我来说听起来很明显。更好的解决方案,我不确定是否存在,可以是: 您向他发送消息的外部服务
POST /api/register-to-timed-callback?when=15m&target-url=http://example.com/api/account/send-greeting
问题
我错过了什么?如何以最简单有效的方式解决这个问题?
您可以根据IHostedService
创建排队后台服务。然后,当用户注册并通过后台服务处理该队列时,您将一个项目添加到队列中。当您从队列中取出项目时,您会根据时间检查它是否已准备好发送。如果是这样,您将点击 send-greeting
端点,否则,您将重新排队该项目。 docs 提供了此类服务的示例。
队列:
public interface IBackgroundTaskQueue
{
void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
new ConcurrentQueue<Func<CancellationToken, Task>>();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(
Func<CancellationToken, Task> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
return workItem;
}
}
以及托管服务:
public class QueuedHostedService : BackgroundService
{
private readonly ILogger _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILoggerFactory loggerFactory)
{
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
}
public IBackgroundTaskQueue TaskQueue { get; }
protected async override Task ExecuteAsync(
CancellationToken cancellationToken)
{
_logger.LogInformation("Queued Hosted Service is starting.");
while (!cancellationToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try
{
await workItem(cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
$"Error occurred executing {nameof(workItem)}.");
}
}
_logger.LogInformation("Queued Hosted Service is stopping.");
}
}
该代码直接来自文档。它主要支持您的用例,但需要进行一些调整才能让您一路走来。首先,因为有一个时间组件(即你只想处理队列中的项目,如果它是 15 分钟前的),你需要制作 ConcurrentQueue<T>
的类型参数,你可以同时编码日期时间和进入。这可以是 ValueTuple
或专门为此目的创建的实际对象:由您决定。例如:
ConcurrentQueue<(DateTimeOffset added, Func<CancellationToken, Task> task)>
然后,如果没有经过足够的时间,您将需要修改出队逻辑以将其重新入队:
public async Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
if (DateTimeOffset.UtcNow.AddMinutes(-15) < workItem.added)
{
_workItems.Enqueue(workItem);
return ct => ct.IsCancellationRequested ? Task.FromCanceled(ct) : Task.CompletedTask;;
}
return workItem;
}
那里的 return 基本上只是一个满足约束的虚拟 lambda。您可能 return 类似 null 的东西,但是您还需要修改后台服务的 ExecuteAsync
方法以在处理函数之前添加空检查。
还值得注意的是,示例代码被设计为通用的,允许您将 任何 排队等待处理。因此,由于需要进行特定于时间的更改,您应该使用更具体的命名:例如 ITimedBackgroundTaskQueue
、TimedBackgroundTaskQueue
和 TimedQueuedHostedService
。鉴于文档中的示例 interfaces/classes 实际上将集成到 ASP.NET Core 3.0.