强制按需延迟调用异步方法,并在先前结​​果过期时再次调用

Enforce an async method to be called lazily on demand, and called again when the previous result has expired

我正在使用一个异步 Web API,它需要一个 AccessToken(一个不可变结构)作为每个 API 调用的参数传递。这个AccessToken本身是通过调用同一个WebAPI.

的异步Authenticate方法得到的
class WebApi
{
    public Task<AccessToken> Authenticate(string username, string password);
    public Task PurchaseItem(AccessToken token, int itemId, int quantity);
    // More methods having an AccessToken parameter
}

出于性能原因,我不想在调用 API 的所有其他方法之前调用 Authenticate 方法。我想调用一次,然后将相同的 AccessToken 重复用于多个 API 调用。我的问题是 AccessToken 每 15 分钟过期一次,使用过期的 AccessToken 调用任何 API 方法都会导致 AccessTokenExpiredException。我可以捕获此异常,然后在获取新的 AccessToken 后重试错误的调用,但出于性能原因,我更愿意在 AccessToken 过期之前抢先刷新它。我的应用程序是多线程的,因此多个线程可能会同时尝试 use/refresh 相同的 AccessToken 值,事情很快就会变得非常混乱。

要求是:

  1. 调用 Authenticate 方法的频率不应超过每 15 分钟一次,即使多个线程试图同时调用 Web API 的方法也是如此。
  2. 如果 Authenticate 调用失败,则应在下次需要 AccessToken 时重复调用。此要求优先于先前的要求。缓存和重用故障 Task<AccessToken> 15 分钟是不可接受的。
  3. 只有在实际需要 AccessToken 时才应调用 Authenticate 方法。每 15 分钟使用 Timer 调用它是不可接受的。
  4. AccessToken 只能在创建后的 15 分钟内使用。
  5. 过期机制不应依赖于系统时钟。系统时钟调整不应影响(延长或缩短)有效期。

我的问题是:怎么可能 我以一种满足要求的方式抽象了获取、监控过期和刷新 AccessToken 的功能,同时让我的应用程序的其余部分远离所有这些复杂性?我在想类似于我在这个问题中发现的 AsyncLazy<T> 类型的东西: ,但增强了过期功能。这是一个使用这种类型的假设示例(使用 TimeSpan 参数增强):

private readonly WebApi _webApi = new WebApi();
private readonly AsyncLazy<AccessToken> _accessToken = new AsyncLazy<AccessToken>(
    () => _webApi.Authenticate("xxx", "yyy"), TimeSpan.FromMinutes(15));

async Task Purchase(int itemId, int quantity)
{
    await _webApi.PurchaseItem(await _accessToken, itemId, quantity);
}

顺便说一句,这个问题的灵感来自 , where the OP was trying to solve a similar problem in a different way. The example presented above is contrived. My intention is to self-answer 这个问题,但欢迎任何贡献作为答案。

我想请求避免在评论中发布答案。请使用评论来询问有关此问题的说明,以防需要说明。

“可重置”AsyncLazy<T>equivalent to a single-item asynchronous cache。在这种情况下,基于时间的过期,相似性更加惊人。

我建议使用实际的 AsyncCache<T>;我有 one I'm working on,目前正在负载非常低的类似产品的环境中使用,但尚未在实际生产环境中经过良好测试。

这是 AsyncExpiringLazy<T> class 的一个实现,它本质上是一个 AsyncLazy<T> 添加了过期功能:

/// <summary>
/// Represents an asynchronous operation that is started on first demand,
/// and is subject to an expiration policy. In case of failure the error
/// is propagated, but it's not cached.
/// </summary>
public class AsyncExpiringLazy<TResult>
{
    private readonly object _locker = new object();
    private readonly Func<Task<TResult>> _factory;
    private readonly Func<TResult, TimeSpan> _expirationSelector;
    private Task<(TResult Value, TimeSpan, long)> _task;

    public AsyncExpiringLazy(Func<Task<TResult>> factory,
        Func<TResult, TimeSpan> expirationSelector)
    {
        // Arguments validation omitted
        _factory = factory;
        _expirationSelector = expirationSelector;
    }

    public Task<TResult> Task => GetTask();
    public TaskAwaiter<TResult> GetAwaiter() => GetTask().GetAwaiter();

    public void Reset()
    {
        lock (_locker) if (_task != null && _task.IsCompleted) _task = null;
    }

    private async Task<TResult> GetTask()
    {
        var capturedTask = Volatile.Read(ref _task); // Capture the current task
        if (capturedTask != null)
        {
            // Propagate non-completed tasks without post-completion expiration checks
            if (!capturedTask.IsCompleted)
                return (await capturedTask.ConfigureAwait(false)).Value;

            // At this point the task is completed, so getting its .Result is OK
            var (value, expiration, timestamp) = capturedTask.GetAwaiter().GetResult();
            // Check if the value has expired, using static Stopwatch methods
            TimeSpan elapsed = TimeSpan.FromSeconds(
                (double)(Stopwatch.GetTimestamp() - timestamp) / Stopwatch.Frequency);
            if (elapsed < expiration) return value; // Return the non-expired value
        }

        // First call, or previous value expired, or previous operation failed
        Task<Task<(TResult, TimeSpan, long)>> newTaskTask = null;
        lock (_locker)
        {
            if (_task == capturedTask || _task == null)
            {
                // The current thread is eligible for creating the new task
                Task<(TResult, TimeSpan, long)> newTask = null;
                newTaskTask = new Task<Task<(TResult, TimeSpan, long)>>(async () =>
                {
                    try
                    {
                        var value = await _factory().ConfigureAwait(false);
                        var expiration = _expirationSelector(value);
                        return (value, expiration, Stopwatch.GetTimestamp());
                    }
                    catch
                    {
                        // Discard the failed task before throwing
                        lock (_locker) if (_task == newTask) _task = null;
                        throw;
                    }
                });
                _task = newTask = newTaskTask.Unwrap();
            }
            capturedTask = _task; // Capture the current task again
        }
        if (newTaskTask != null) newTaskTask.RunSynchronously(TaskScheduler.Default);
        return (await capturedTask.ConfigureAwait(false)).Value;
    }
}

此实现是 AsyncLazy<T> class 的修改版本,在 . Here I used the lock statement for synchronization, because using Interlocked 操作中发现需要执行循环来覆盖极端情况,这会使代码比它更晦涩已经是了。

AsyncExpiringLazy<T> 构造函数接受两个委托。第一个(factory)是产生结果的异步工厂,它在调用者的上下文中被调用。第二个(expirationSelector)是有效期的选择器,它是一个TimeSpan,并将产生的结果作为参数。在异步生成结果后立即在未知上下文(通常是 ThreadPool 上下文)上调用此委托。

用法示例:

_webApi = new WebApi();
_accessToken = new AsyncExpiringLazy<AccessToken>(
    async () => await _webApi.Authenticate("xxx", "yyy"), _ => TimeSpan.FromMinutes(15));
await _webApi.PurchaseItem(await _accessToken, itemId, quantity);