分区:如何在每个分区后添加等待

Partition: How to add a wait after every partition

我有一个API,每分钟接受20个请求,之后,我需要等待1分钟才能查询它。我有一个项目列表(通常超过 1000 个),我需要从 API 中查询其详细信息,我的想法是我可以使用 Partitioner 将我的列表划分为 20 items/requests 但很快我意识到 Partitioner 不是那样工作的,我的第二个想法是在分区中添加一个 delay 但这也是一个坏主意,根据我的理解,它会在每个不需要的请求之后增加延迟,相反,我需要在每 Partition 之后延迟一次。下面是我的代码:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)
{
    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate {
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                {
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                }
            return allResponses;
        }, token));
    return whenAll.SelectMany(x => x);
}

有谁知道我怎样才能做到这一点?

这里有一个 RateLimiter class,您可以使用它来限制异步操作​​的频率。它是 this 答案中 RateLimiter class 的更简单实现。

/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;
    private readonly CancellationTokenSource _disposeCts;
    private readonly CancellationToken _disposeToken;
    private bool _disposed;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
        _disposeCts = new CancellationTokenSource();
        _disposeToken = _disposeCts.Token;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        ScheduleSemaphoreRelease();
    }

    private async void ScheduleSemaphoreRelease()
    {
        try { await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false); }
        catch (OperationCanceledException) { } // Ignore
        lock (_semaphore) { if (!_disposed) _semaphore.Release(); }
    }

    /// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
    public void Dispose()
    {
        lock (_semaphore)
        {
            if (_disposed) return;
            _semaphore.Dispose();
            _disposed = true;
            _disposeCts.Cancel();
            _disposeCts.Dispose();
        }
    }
}

用法示例:

List<string> urls = GetUrls();

using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));

注意:我添加了一个Dispose方法,可以取消RateLimiterclass内部发起的异步操作。这个 使用完 RateLimiter 后应调用方法,否则挂起的异步操作将阻止 RateLimiter 除了消耗与活动 Task.Delay 任务相关的资源外,还可以及时收集垃圾。 原始的非常简单但有漏洞的实现可以在这个答案的 2nd revision 中找到。


我正在添加 RateLimiter class 的替代实现,更复杂,它基于 Stopwatch 而不是 SemaphoreSlim。它的优点是它不需要是一次性的,因为它不会在后台启动隐藏的异步操作。缺点是 WaitAsync 方法不支持 CancellationToken 参数,而且由于复杂,出现错误的概率较高。

public class RateLimiter
{
    private readonly Stopwatch _stopwatch;
    private readonly Queue<TimeSpan> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _stopwatch = Stopwatch.StartNew();
        _queue = new Queue<TimeSpan>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnit = timeUnit;
    }

    public Task WaitAsync()
    {
        var delay = TimeSpan.Zero;
        lock (_stopwatch)
        {
            var currentTimestamp = _stopwatch.Elapsed;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            {
                _queue.Dequeue();
            }
            if (_queue.Count >= _maxActionsPerTimeUnit)
            {
                var refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delay = refTimestamp - currentTimestamp;
                Debug.Assert(delay >= TimeSpan.Zero);
                if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
            }
            _queue.Enqueue(currentTimestamp + delay + _timeUnit);
        }
        if (delay == TimeSpan.Zero) return Task.CompletedTask;
        return Task.Delay(delay);
    }
}