分区:如何在每个分区后添加等待
Partition: How to add a wait after every partition
我有一个API,每分钟接受20个请求,之后,我需要等待1分钟才能查询它。我有一个项目列表(通常超过 1000 个),我需要从 API 中查询其详细信息,我的想法是我可以使用 Partitioner
将我的列表划分为 20 items/requests 但很快我意识到 Partitioner
不是那样工作的,我的第二个想法是在分区中添加一个 delay
但这也是一个坏主意,根据我的理解,它会在每个不需要的请求之后增加延迟,相反,我需要在每 Partition
之后延迟一次。下面是我的代码:
public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
[Optional] int delay)
{
var whenAll = await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run(async delegate {
var allResponses = new List<V>();
using (partition)
while (partition.MoveNext())
{
allResponses.Add(await body(partition.Current));
await Task.Delay(TimeSpan.FromSeconds(delay));
}
return allResponses;
}, token));
return whenAll.SelectMany(x => x);
}
有谁知道我怎样才能做到这一点?
这里有一个 RateLimiter
class,您可以使用它来限制异步操作的频率。它是 this 答案中 RateLimiter
class 的更简单实现。
/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
private readonly CancellationTokenSource _disposeCts;
private readonly CancellationToken _disposeToken;
private bool _disposed;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
_disposeCts = new CancellationTokenSource();
_disposeToken = _disposeCts.Token;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
ScheduleSemaphoreRelease();
}
private async void ScheduleSemaphoreRelease()
{
try { await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false); }
catch (OperationCanceledException) { } // Ignore
lock (_semaphore) { if (!_disposed) _semaphore.Release(); }
}
/// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
public void Dispose()
{
lock (_semaphore)
{
if (_disposed) return;
_semaphore.Dispose();
_disposed = true;
_disposeCts.Cancel();
_disposeCts.Dispose();
}
}
}
用法示例:
List<string> urls = GetUrls();
using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));
string[] documents = await Task.WhenAll(urls.Select(async url =>
{
await rateLimiter.WaitAsync();
return await _httpClient.GetStringAsync(url);
}));
注意:我添加了一个Dispose
方法,可以取消RateLimiter
class内部发起的异步操作。这个
使用完 RateLimiter
后应调用方法,否则挂起的异步操作将阻止
RateLimiter
除了消耗与活动 Task.Delay
任务相关的资源外,还可以及时收集垃圾。
原始的非常简单但有漏洞的实现可以在这个答案的 2nd revision 中找到。
我正在添加 RateLimiter
class 的替代实现,更复杂,它基于 Stopwatch
而不是 SemaphoreSlim
。它的优点是它不需要是一次性的,因为它不会在后台启动隐藏的异步操作。缺点是 WaitAsync
方法不支持 CancellationToken
参数,而且由于复杂,出现错误的概率较高。
public class RateLimiter
{
private readonly Stopwatch _stopwatch;
private readonly Queue<TimeSpan> _queue;
private readonly int _maxActionsPerTimeUnit;
private readonly TimeSpan _timeUnit;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
// Arguments validation omitted
_stopwatch = Stopwatch.StartNew();
_queue = new Queue<TimeSpan>();
_maxActionsPerTimeUnit = maxActionsPerTimeUnit;
_timeUnit = timeUnit;
}
public Task WaitAsync()
{
var delay = TimeSpan.Zero;
lock (_stopwatch)
{
var currentTimestamp = _stopwatch.Elapsed;
while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
{
_queue.Dequeue();
}
if (_queue.Count >= _maxActionsPerTimeUnit)
{
var refTimestamp = _queue
.Skip(_queue.Count - _maxActionsPerTimeUnit).First();
delay = refTimestamp - currentTimestamp;
Debug.Assert(delay >= TimeSpan.Zero);
if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
}
_queue.Enqueue(currentTimestamp + delay + _timeUnit);
}
if (delay == TimeSpan.Zero) return Task.CompletedTask;
return Task.Delay(delay);
}
}
我有一个API,每分钟接受20个请求,之后,我需要等待1分钟才能查询它。我有一个项目列表(通常超过 1000 个),我需要从 API 中查询其详细信息,我的想法是我可以使用 Partitioner
将我的列表划分为 20 items/requests 但很快我意识到 Partitioner
不是那样工作的,我的第二个想法是在分区中添加一个 delay
但这也是一个坏主意,根据我的理解,它会在每个不需要的请求之后增加延迟,相反,我需要在每 Partition
之后延迟一次。下面是我的代码:
public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
[Optional] int delay)
{
var whenAll = await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run(async delegate {
var allResponses = new List<V>();
using (partition)
while (partition.MoveNext())
{
allResponses.Add(await body(partition.Current));
await Task.Delay(TimeSpan.FromSeconds(delay));
}
return allResponses;
}, token));
return whenAll.SelectMany(x => x);
}
有谁知道我怎样才能做到这一点?
这里有一个 RateLimiter
class,您可以使用它来限制异步操作的频率。它是 this 答案中 RateLimiter
class 的更简单实现。
/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
private readonly CancellationTokenSource _disposeCts;
private readonly CancellationToken _disposeToken;
private bool _disposed;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
_disposeCts = new CancellationTokenSource();
_disposeToken = _disposeCts.Token;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
ScheduleSemaphoreRelease();
}
private async void ScheduleSemaphoreRelease()
{
try { await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false); }
catch (OperationCanceledException) { } // Ignore
lock (_semaphore) { if (!_disposed) _semaphore.Release(); }
}
/// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
public void Dispose()
{
lock (_semaphore)
{
if (_disposed) return;
_semaphore.Dispose();
_disposed = true;
_disposeCts.Cancel();
_disposeCts.Dispose();
}
}
}
用法示例:
List<string> urls = GetUrls();
using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));
string[] documents = await Task.WhenAll(urls.Select(async url =>
{
await rateLimiter.WaitAsync();
return await _httpClient.GetStringAsync(url);
}));
注意:我添加了一个Dispose
方法,可以取消RateLimiter
class内部发起的异步操作。这个
使用完 RateLimiter
后应调用方法,否则挂起的异步操作将阻止
RateLimiter
除了消耗与活动 Task.Delay
任务相关的资源外,还可以及时收集垃圾。
原始的非常简单但有漏洞的实现可以在这个答案的 2nd revision 中找到。
我正在添加 RateLimiter
class 的替代实现,更复杂,它基于 Stopwatch
而不是 SemaphoreSlim
。它的优点是它不需要是一次性的,因为它不会在后台启动隐藏的异步操作。缺点是 WaitAsync
方法不支持 CancellationToken
参数,而且由于复杂,出现错误的概率较高。
public class RateLimiter
{
private readonly Stopwatch _stopwatch;
private readonly Queue<TimeSpan> _queue;
private readonly int _maxActionsPerTimeUnit;
private readonly TimeSpan _timeUnit;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
// Arguments validation omitted
_stopwatch = Stopwatch.StartNew();
_queue = new Queue<TimeSpan>();
_maxActionsPerTimeUnit = maxActionsPerTimeUnit;
_timeUnit = timeUnit;
}
public Task WaitAsync()
{
var delay = TimeSpan.Zero;
lock (_stopwatch)
{
var currentTimestamp = _stopwatch.Elapsed;
while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
{
_queue.Dequeue();
}
if (_queue.Count >= _maxActionsPerTimeUnit)
{
var refTimestamp = _queue
.Skip(_queue.Count - _maxActionsPerTimeUnit).First();
delay = refTimestamp - currentTimestamp;
Debug.Assert(delay >= TimeSpan.Zero);
if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
}
_queue.Enqueue(currentTimestamp + delay + _timeUnit);
}
if (delay == TimeSpan.Zero) return Task.CompletedTask;
return Task.Delay(delay);
}
}