回绕速率限制 API 调用
Wrapping rate limiting API call
我可以访问一个 API 呼叫,该呼叫 每秒接受最大呼叫速率 。如果超过速率,则会抛出 异常 。
我想将此调用包装到一个抽象中,该抽象执行必要的操作以将调用率保持在限制范围内。它就像一个网络路由器:处理多个呼叫并将结果 return 发送给关心呼叫率的正确呼叫者。目标是使调用代码尽可能不知道该限制。否则,代码中具有此调用的每个部分都必须包装到 try-catch 中!
例如: 想象一下,您可以从外部 API 调用一个可以将 2 个数字相加的方法。这个API每秒可以调用5次。任何高于此值的值都会导致异常。
为了说明问题,限制调用率的外部服务就像这个问题的答案
附加信息:
由于您不希望每次从代码的任何部分调用此方法时都担心该限制,因此您考虑设计一个包装器方法,您可以在调用时不必担心速率限制。在内部你关心限制,但在外部你暴露了一个简单的异步方法。
它类似于网络服务器。它如何 return 将正确的结果包提供给正确的客户?
多个调用者将调用此方法,他们将得到结果。这种抽象应该像代理一样。
我该怎么做?
我确定包装方法的公司应该像
public async Task<Results> MyMethod()
在方法内部,它将执行逻辑,可能使用 Reactive Extensions (Buffer)。我不知道。
但是怎么办?我的意思是,多次调用此方法应该 return 将结果发送给正确的调用者。这甚至可能吗?
非常感谢!
实现这一点的一个变体是确保调用之间的最短时间,如下所示:
private readonly Object syncLock = new Object();
private readonly TimeSpan minTimeout = TimeSpan.FromSeconds(5);
private volatile DateTime nextCallDate = DateTime.MinValue;
public async Task<Result> RequestData(...) {
DateTime possibleCallDate = DateTime.Now;
lock(syncLock) {
// When is it possible to make the next call?
if (nextCallDate > possibleCallDate) {
possibleCallDate = nextCallDate;
}
nextCallDate = possibleCallDate + minTimeout;
}
TimeSpan waitingTime = possibleCallDate - DateTime.Now;
if (waitingTime > TimeSpan.Zero) {
await Task.Delay(waitingTime);
}
return await ... /* the actual call to API */ ...;
}
具体应该做什么取决于您的目标和局限性。我的假设:
- 您想在速率限制器生效时避免发出请求
- 您无法预测某个特定请求是否会被拒绝,或者需要多长时间才能再次允许另一个请求
- 你不需要同时发起多个请求,当多个请求等待时,它们的完成顺序无关紧要
如果这些假设有效,您可以使用AsyncAutoResetEvent
from AsyncEx:在发出请求之前等待设置,在成功发出请求后设置,并在速率受限时延迟设置。
代码可以如下所示:
class RateLimitedWrapper<TException> where TException : Exception
{
private readonly AsyncAutoResetEvent autoResetEvent = new AsyncAutoResetEvent(set: true);
public async Task<T> Execute<T>(Func<Task<T>> func)
{
while (true)
{
try
{
await autoResetEvent.WaitAsync();
var result = await func();
autoResetEvent.Set();
return result;
}
catch (TException)
{
var ignored = Task.Delay(500).ContinueWith(_ => autoResetEvent.Set());
}
}
}
}
用法:
public static Task<int> Add(int a, int b)
{
return rateLimitedWrapper.Execute(() => rateLimitingCalculator.Add(a, b));
}
有可用的速率限制库(请参阅 Esendex 的 TokenBucket Github or Nuget)。
用法很简单,这个例子将轮询限制为每秒 1 次
// Create a token bucket with a capacity of 1 token that refills at a fixed interval of 1 token/sec.
ITokenBucket bucket = TokenBuckets.Construct()
.WithCapacity(1)
.WithFixedIntervalRefillStrategy(1, TimeSpan.FromSeconds(1))
.Build();
// ...
while (true)
{
// Consume a token from the token bucket. If a token is not available this method will block until
// the refill strategy adds one to the bucket.
bucket.Consume(1);
Poll();
}
我的一个项目也需要异步,我简单做了一个扩展方法:
public static class TokenBucketExtensions
{
public static Task ConsumeAsync(this ITokenBucket tokenBucket)
{
return Task.Factory.StartNew(tokenBucket.Consume);
}
}
使用它你不需要 throw/catch 异常并且编写包装器变得相当简单
我可以访问一个 API 呼叫,该呼叫 每秒接受最大呼叫速率 。如果超过速率,则会抛出 异常 。
我想将此调用包装到一个抽象中,该抽象执行必要的操作以将调用率保持在限制范围内。它就像一个网络路由器:处理多个呼叫并将结果 return 发送给关心呼叫率的正确呼叫者。目标是使调用代码尽可能不知道该限制。否则,代码中具有此调用的每个部分都必须包装到 try-catch 中!
例如: 想象一下,您可以从外部 API 调用一个可以将 2 个数字相加的方法。这个API每秒可以调用5次。任何高于此值的值都会导致异常。
为了说明问题,限制调用率的外部服务就像这个问题的答案
附加信息:
由于您不希望每次从代码的任何部分调用此方法时都担心该限制,因此您考虑设计一个包装器方法,您可以在调用时不必担心速率限制。在内部你关心限制,但在外部你暴露了一个简单的异步方法。
它类似于网络服务器。它如何 return 将正确的结果包提供给正确的客户?
多个调用者将调用此方法,他们将得到结果。这种抽象应该像代理一样。
我该怎么做?
我确定包装方法的公司应该像
public async Task<Results> MyMethod()
在方法内部,它将执行逻辑,可能使用 Reactive Extensions (Buffer)。我不知道。
但是怎么办?我的意思是,多次调用此方法应该 return 将结果发送给正确的调用者。这甚至可能吗?
非常感谢!
实现这一点的一个变体是确保调用之间的最短时间,如下所示:
private readonly Object syncLock = new Object();
private readonly TimeSpan minTimeout = TimeSpan.FromSeconds(5);
private volatile DateTime nextCallDate = DateTime.MinValue;
public async Task<Result> RequestData(...) {
DateTime possibleCallDate = DateTime.Now;
lock(syncLock) {
// When is it possible to make the next call?
if (nextCallDate > possibleCallDate) {
possibleCallDate = nextCallDate;
}
nextCallDate = possibleCallDate + minTimeout;
}
TimeSpan waitingTime = possibleCallDate - DateTime.Now;
if (waitingTime > TimeSpan.Zero) {
await Task.Delay(waitingTime);
}
return await ... /* the actual call to API */ ...;
}
具体应该做什么取决于您的目标和局限性。我的假设:
- 您想在速率限制器生效时避免发出请求
- 您无法预测某个特定请求是否会被拒绝,或者需要多长时间才能再次允许另一个请求
- 你不需要同时发起多个请求,当多个请求等待时,它们的完成顺序无关紧要
如果这些假设有效,您可以使用AsyncAutoResetEvent
from AsyncEx:在发出请求之前等待设置,在成功发出请求后设置,并在速率受限时延迟设置。
代码可以如下所示:
class RateLimitedWrapper<TException> where TException : Exception
{
private readonly AsyncAutoResetEvent autoResetEvent = new AsyncAutoResetEvent(set: true);
public async Task<T> Execute<T>(Func<Task<T>> func)
{
while (true)
{
try
{
await autoResetEvent.WaitAsync();
var result = await func();
autoResetEvent.Set();
return result;
}
catch (TException)
{
var ignored = Task.Delay(500).ContinueWith(_ => autoResetEvent.Set());
}
}
}
}
用法:
public static Task<int> Add(int a, int b)
{
return rateLimitedWrapper.Execute(() => rateLimitingCalculator.Add(a, b));
}
有可用的速率限制库(请参阅 Esendex 的 TokenBucket Github or Nuget)。
用法很简单,这个例子将轮询限制为每秒 1 次
// Create a token bucket with a capacity of 1 token that refills at a fixed interval of 1 token/sec.
ITokenBucket bucket = TokenBuckets.Construct()
.WithCapacity(1)
.WithFixedIntervalRefillStrategy(1, TimeSpan.FromSeconds(1))
.Build();
// ...
while (true)
{
// Consume a token from the token bucket. If a token is not available this method will block until
// the refill strategy adds one to the bucket.
bucket.Consume(1);
Poll();
}
我的一个项目也需要异步,我简单做了一个扩展方法:
public static class TokenBucketExtensions
{
public static Task ConsumeAsync(this ITokenBucket tokenBucket)
{
return Task.Factory.StartNew(tokenBucket.Consume);
}
}
使用它你不需要 throw/catch 异常并且编写包装器变得相当简单