强制按需延迟调用异步方法,并在先前结果过期时再次调用
Enforce an async method to be called lazily on demand, and called again when the previous result has expired
我正在使用一个异步 Web API,它需要一个 AccessToken
(一个不可变结构)作为每个 API 调用的参数传递。这个AccessToken
本身是通过调用同一个WebAPI.
的异步Authenticate
方法得到的
class WebApi
{
public Task<AccessToken> Authenticate(string username, string password);
public Task PurchaseItem(AccessToken token, int itemId, int quantity);
// More methods having an AccessToken parameter
}
出于性能原因,我不想在调用 API 的所有其他方法之前调用 Authenticate
方法。我想调用一次,然后将相同的 AccessToken
重复用于多个 API 调用。我的问题是 AccessToken
每 15 分钟过期一次,使用过期的 AccessToken
调用任何 API 方法都会导致 AccessTokenExpiredException
。我可以捕获此异常,然后在获取新的 AccessToken
后重试错误的调用,但出于性能原因,我更愿意在 AccessToken
过期之前抢先刷新它。我的应用程序是多线程的,因此多个线程可能会同时尝试 use/refresh 相同的 AccessToken
值,事情很快就会变得非常混乱。
要求是:
- 调用
Authenticate
方法的频率不应超过每 15 分钟一次,即使多个线程试图同时调用 Web API 的方法也是如此。
- 如果
Authenticate
调用失败,则应在下次需要 AccessToken
时重复调用。此要求优先于先前的要求。缓存和重用故障 Task<AccessToken>
15 分钟是不可接受的。
- 只有在实际需要
AccessToken
时才应调用 Authenticate
方法。每 15 分钟使用 Timer
调用它是不可接受的。
AccessToken
只能在创建后的 15 分钟内使用。
- 过期机制不应依赖于系统时钟。系统时钟调整不应影响(延长或缩短)有效期。
我的问题是:怎么可能
我以一种满足要求的方式抽象了获取、监控过期和刷新 AccessToken
的功能,同时让我的应用程序的其余部分远离所有这些复杂性?我在想类似于我在这个问题中发现的 AsyncLazy<T>
类型的东西:
,但增强了过期功能。这是一个使用这种类型的假设示例(使用 TimeSpan
参数增强):
private readonly WebApi _webApi = new WebApi();
private readonly AsyncLazy<AccessToken> _accessToken = new AsyncLazy<AccessToken>(
() => _webApi.Authenticate("xxx", "yyy"), TimeSpan.FromMinutes(15));
async Task Purchase(int itemId, int quantity)
{
await _webApi.PurchaseItem(await _accessToken, itemId, quantity);
}
顺便说一句,这个问题的灵感来自 , where the OP was trying to solve a similar problem in a different way. The example presented above is contrived. My intention is to self-answer 这个问题,但欢迎任何贡献作为答案。
我想请求避免在评论中发布答案。请使用评论来询问有关此问题的说明,以防需要说明。
“可重置”AsyncLazy<T>
是 equivalent to a single-item asynchronous cache。在这种情况下,基于时间的过期,相似性更加惊人。
我建议使用实际的 AsyncCache<T>
;我有 one I'm working on,目前正在负载非常低的类似产品的环境中使用,但尚未在实际生产环境中经过良好测试。
这是 AsyncExpiringLazy<T>
class 的一个实现,它本质上是一个 AsyncLazy<T>
添加了过期功能:
/// <summary>
/// Represents an asynchronous operation that is started on first demand,
/// and is subject to an expiration policy. In case of failure the error
/// is propagated, but it's not cached.
/// </summary>
public class AsyncExpiringLazy<TResult>
{
private readonly object _locker = new object();
private readonly Func<Task<TResult>> _factory;
private readonly Func<TResult, TimeSpan> _expirationSelector;
private Task<(TResult Value, TimeSpan, long)> _task;
public AsyncExpiringLazy(Func<Task<TResult>> factory,
Func<TResult, TimeSpan> expirationSelector)
{
// Arguments validation omitted
_factory = factory;
_expirationSelector = expirationSelector;
}
public Task<TResult> Task => GetTask();
public TaskAwaiter<TResult> GetAwaiter() => GetTask().GetAwaiter();
public void Reset()
{
lock (_locker) if (_task != null && _task.IsCompleted) _task = null;
}
private async Task<TResult> GetTask()
{
var capturedTask = Volatile.Read(ref _task); // Capture the current task
if (capturedTask != null)
{
// Propagate non-completed tasks without post-completion expiration checks
if (!capturedTask.IsCompleted)
return (await capturedTask.ConfigureAwait(false)).Value;
// At this point the task is completed, so getting its .Result is OK
var (value, expiration, timestamp) = capturedTask.GetAwaiter().GetResult();
// Check if the value has expired, using static Stopwatch methods
TimeSpan elapsed = TimeSpan.FromSeconds(
(double)(Stopwatch.GetTimestamp() - timestamp) / Stopwatch.Frequency);
if (elapsed < expiration) return value; // Return the non-expired value
}
// First call, or previous value expired, or previous operation failed
Task<Task<(TResult, TimeSpan, long)>> newTaskTask = null;
lock (_locker)
{
if (_task == capturedTask || _task == null)
{
// The current thread is eligible for creating the new task
Task<(TResult, TimeSpan, long)> newTask = null;
newTaskTask = new Task<Task<(TResult, TimeSpan, long)>>(async () =>
{
try
{
var value = await _factory().ConfigureAwait(false);
var expiration = _expirationSelector(value);
return (value, expiration, Stopwatch.GetTimestamp());
}
catch
{
// Discard the failed task before throwing
lock (_locker) if (_task == newTask) _task = null;
throw;
}
});
_task = newTask = newTaskTask.Unwrap();
}
capturedTask = _task; // Capture the current task again
}
if (newTaskTask != null) newTaskTask.RunSynchronously(TaskScheduler.Default);
return (await capturedTask.ConfigureAwait(false)).Value;
}
}
此实现是 AsyncLazy<T>
class 的修改版本,在 . Here I used the lock
statement for synchronization, because using Interlocked
操作中发现需要执行循环来覆盖极端情况,这会使代码比它更晦涩已经是了。
AsyncExpiringLazy<T>
构造函数接受两个委托。第一个(factory
)是产生结果的异步工厂,它在调用者的上下文中被调用。第二个(expirationSelector
)是有效期的选择器,它是一个TimeSpan
,并将产生的结果作为参数。在异步生成结果后立即在未知上下文(通常是 ThreadPool
上下文)上调用此委托。
用法示例:
_webApi = new WebApi();
_accessToken = new AsyncExpiringLazy<AccessToken>(
async () => await _webApi.Authenticate("xxx", "yyy"), _ => TimeSpan.FromMinutes(15));
await _webApi.PurchaseItem(await _accessToken, itemId, quantity);
我正在使用一个异步 Web API,它需要一个 AccessToken
(一个不可变结构)作为每个 API 调用的参数传递。这个AccessToken
本身是通过调用同一个WebAPI.
Authenticate
方法得到的
class WebApi
{
public Task<AccessToken> Authenticate(string username, string password);
public Task PurchaseItem(AccessToken token, int itemId, int quantity);
// More methods having an AccessToken parameter
}
出于性能原因,我不想在调用 API 的所有其他方法之前调用 Authenticate
方法。我想调用一次,然后将相同的 AccessToken
重复用于多个 API 调用。我的问题是 AccessToken
每 15 分钟过期一次,使用过期的 AccessToken
调用任何 API 方法都会导致 AccessTokenExpiredException
。我可以捕获此异常,然后在获取新的 AccessToken
后重试错误的调用,但出于性能原因,我更愿意在 AccessToken
过期之前抢先刷新它。我的应用程序是多线程的,因此多个线程可能会同时尝试 use/refresh 相同的 AccessToken
值,事情很快就会变得非常混乱。
要求是:
- 调用
Authenticate
方法的频率不应超过每 15 分钟一次,即使多个线程试图同时调用 Web API 的方法也是如此。 - 如果
Authenticate
调用失败,则应在下次需要AccessToken
时重复调用。此要求优先于先前的要求。缓存和重用故障Task<AccessToken>
15 分钟是不可接受的。 - 只有在实际需要
AccessToken
时才应调用Authenticate
方法。每 15 分钟使用Timer
调用它是不可接受的。 AccessToken
只能在创建后的 15 分钟内使用。- 过期机制不应依赖于系统时钟。系统时钟调整不应影响(延长或缩短)有效期。
我的问题是:怎么可能
我以一种满足要求的方式抽象了获取、监控过期和刷新 AccessToken
的功能,同时让我的应用程序的其余部分远离所有这些复杂性?我在想类似于我在这个问题中发现的 AsyncLazy<T>
类型的东西:
TimeSpan
参数增强):
private readonly WebApi _webApi = new WebApi();
private readonly AsyncLazy<AccessToken> _accessToken = new AsyncLazy<AccessToken>(
() => _webApi.Authenticate("xxx", "yyy"), TimeSpan.FromMinutes(15));
async Task Purchase(int itemId, int quantity)
{
await _webApi.PurchaseItem(await _accessToken, itemId, quantity);
}
顺便说一句,这个问题的灵感来自
我想请求避免在评论中发布答案。请使用评论来询问有关此问题的说明,以防需要说明。
“可重置”AsyncLazy<T>
是 equivalent to a single-item asynchronous cache。在这种情况下,基于时间的过期,相似性更加惊人。
我建议使用实际的 AsyncCache<T>
;我有 one I'm working on,目前正在负载非常低的类似产品的环境中使用,但尚未在实际生产环境中经过良好测试。
这是 AsyncExpiringLazy<T>
class 的一个实现,它本质上是一个 AsyncLazy<T>
添加了过期功能:
/// <summary>
/// Represents an asynchronous operation that is started on first demand,
/// and is subject to an expiration policy. In case of failure the error
/// is propagated, but it's not cached.
/// </summary>
public class AsyncExpiringLazy<TResult>
{
private readonly object _locker = new object();
private readonly Func<Task<TResult>> _factory;
private readonly Func<TResult, TimeSpan> _expirationSelector;
private Task<(TResult Value, TimeSpan, long)> _task;
public AsyncExpiringLazy(Func<Task<TResult>> factory,
Func<TResult, TimeSpan> expirationSelector)
{
// Arguments validation omitted
_factory = factory;
_expirationSelector = expirationSelector;
}
public Task<TResult> Task => GetTask();
public TaskAwaiter<TResult> GetAwaiter() => GetTask().GetAwaiter();
public void Reset()
{
lock (_locker) if (_task != null && _task.IsCompleted) _task = null;
}
private async Task<TResult> GetTask()
{
var capturedTask = Volatile.Read(ref _task); // Capture the current task
if (capturedTask != null)
{
// Propagate non-completed tasks without post-completion expiration checks
if (!capturedTask.IsCompleted)
return (await capturedTask.ConfigureAwait(false)).Value;
// At this point the task is completed, so getting its .Result is OK
var (value, expiration, timestamp) = capturedTask.GetAwaiter().GetResult();
// Check if the value has expired, using static Stopwatch methods
TimeSpan elapsed = TimeSpan.FromSeconds(
(double)(Stopwatch.GetTimestamp() - timestamp) / Stopwatch.Frequency);
if (elapsed < expiration) return value; // Return the non-expired value
}
// First call, or previous value expired, or previous operation failed
Task<Task<(TResult, TimeSpan, long)>> newTaskTask = null;
lock (_locker)
{
if (_task == capturedTask || _task == null)
{
// The current thread is eligible for creating the new task
Task<(TResult, TimeSpan, long)> newTask = null;
newTaskTask = new Task<Task<(TResult, TimeSpan, long)>>(async () =>
{
try
{
var value = await _factory().ConfigureAwait(false);
var expiration = _expirationSelector(value);
return (value, expiration, Stopwatch.GetTimestamp());
}
catch
{
// Discard the failed task before throwing
lock (_locker) if (_task == newTask) _task = null;
throw;
}
});
_task = newTask = newTaskTask.Unwrap();
}
capturedTask = _task; // Capture the current task again
}
if (newTaskTask != null) newTaskTask.RunSynchronously(TaskScheduler.Default);
return (await capturedTask.ConfigureAwait(false)).Value;
}
}
此实现是 AsyncLazy<T>
class 的修改版本,在 lock
statement for synchronization, because using Interlocked
操作中发现需要执行循环来覆盖极端情况,这会使代码比它更晦涩已经是了。
AsyncExpiringLazy<T>
构造函数接受两个委托。第一个(factory
)是产生结果的异步工厂,它在调用者的上下文中被调用。第二个(expirationSelector
)是有效期的选择器,它是一个TimeSpan
,并将产生的结果作为参数。在异步生成结果后立即在未知上下文(通常是 ThreadPool
上下文)上调用此委托。
用法示例:
_webApi = new WebApi();
_accessToken = new AsyncExpiringLazy<AccessToken>(
async () => await _webApi.Authenticate("xxx", "yyy"), _ => TimeSpan.FromMinutes(15));
await _webApi.PurchaseItem(await _accessToken, itemId, quantity);