回绕速率限制 API 调用

Wrapping rate limiting API call

我可以访问一个 API 呼叫,该呼叫 每秒接受最大呼叫速率 。如果超过速率,则会抛出 异常

我想将此调用包装到一个抽象中,该抽象执行必要的操作以将调用率保持在限制范围内。它就像一个网络路由器:处理多个呼叫并将结果 return 发送给关心呼叫率的正确呼叫者。目标是使调用代码尽可能不知道该限制。否则,代码中具有此调用的每个部分都必须包装到 try-catch 中!

例如: 想象一下,您可以从外部 API 调用一个可以将 2 个数字相加的方法。这个API每秒可以调用5次。任何高于此值的值都会导致异常。

为了说明问题,限制调用率的外部服务就像这个问题的答案

附加信息:

由于您不希望每次从代码的任何部分调用此方法时都担心该限制,因此您考虑设计一个包装器方法,您可以在调用时不必担心速率限制。在内部你关心限制,但在外部你暴露了一个简单的异步方法。

它类似于网络服务器。它如何 return 将正确的结果包提供给正确的客户?

多个调用者将调用此方法,他们将得到结果。这种抽象应该像代理一样。

我该怎么做?

我确定包装方法的公司应该像

public async Task<Results> MyMethod()

在方法内部,它将执行逻辑,可能使用 Reactive Extensions (Buffer)。我不知道。

但是怎么办?我的意思是,多次调用此方法应该 return 将结果发送给正确的调用者。这甚至可能吗?

非常感谢!

实现这一点的一个变体是确保调用之间的最短时间,如下所示:

private readonly Object syncLock = new Object();
private readonly TimeSpan minTimeout = TimeSpan.FromSeconds(5);
private volatile DateTime nextCallDate = DateTime.MinValue;

public async Task<Result> RequestData(...) {
    DateTime possibleCallDate = DateTime.Now;
    lock(syncLock) {
        // When is it possible to make the next call?
        if (nextCallDate > possibleCallDate) {
            possibleCallDate = nextCallDate;
        }
        nextCallDate = possibleCallDate + minTimeout;
    }

    TimeSpan waitingTime = possibleCallDate - DateTime.Now;
    if (waitingTime > TimeSpan.Zero) {
        await Task.Delay(waitingTime);
    }

    return await ... /* the actual call to API */ ...;
}

具体应该做什么取决于您的目标和局限性。我的假设:

  • 您想在速率限制器生效时避免发出请求
  • 您无法预测某个特定请求是否会被拒绝,或者需要多长时间才能再次允许另一个请求
  • 你不需要同时发起多个请求,当多个请求等待时,它们的完成顺序无关紧要

如果这些假设有效,您可以使用AsyncAutoResetEvent from AsyncEx:在发出请求之前等待设置,在成功发出请求后设置,并在速率受限时延迟设置。

代码可以如下所示:

class RateLimitedWrapper<TException> where TException : Exception
{
    private readonly AsyncAutoResetEvent autoResetEvent = new AsyncAutoResetEvent(set: true);

    public async Task<T> Execute<T>(Func<Task<T>> func) 
    {
        while (true)
        {
            try
            {
                await autoResetEvent.WaitAsync();

                var result = await func();

                autoResetEvent.Set();

                return result;
            }
            catch (TException)
            {
                var ignored = Task.Delay(500).ContinueWith(_ => autoResetEvent.Set());
            }
        }
    }
}

用法:

public static Task<int> Add(int a, int b)
{
    return rateLimitedWrapper.Execute(() => rateLimitingCalculator.Add(a, b));
}

有可用的速率限制库(请参阅 Esendex 的 TokenBucket Github or Nuget)。

用法很简单,这个例子将轮询限制为每秒 1 次

// Create a token bucket with a capacity of 1 token that refills at a fixed interval of 1 token/sec.
ITokenBucket bucket = TokenBuckets.Construct()
  .WithCapacity(1)
  .WithFixedIntervalRefillStrategy(1, TimeSpan.FromSeconds(1))
  .Build();

// ...

while (true)
{
  // Consume a token from the token bucket.  If a token is not available this method will block until
  // the refill strategy adds one to the bucket.
  bucket.Consume(1);

  Poll();
}

我的一个项目也需要异步,我简单做了一个扩展方法:

public static class TokenBucketExtensions
{
    public static Task ConsumeAsync(this ITokenBucket tokenBucket)
    {
        return Task.Factory.StartNew(tokenBucket.Consume);
    }
}

使用它你不需要 throw/catch 异常并且编写包装器变得相当简单