如何在创建特定数量的 tasks\Threads 后等待特定的时间?
How to wait for a specific amount of time after creating a specific number of tasks\Threads?
我有一个要求,我可以在一秒钟内击中 API 5 次。如果我必须发出总共 50 个请求,我想发出前 5 个请求并等待 1 秒,然后我可以用另一批 5 个请求点击 API。我尝试使用线程池以及并行任务库 For\Foreach 循环和任务 类 但我无法获得一个顺序计数器来告诉我已经创建了 5 个任务。
这是我正在尝试做的示例:
List<string> str = new List<string>();
for (int i = 0; i <= 100; i++)
{
str.Add(i.ToString());
}
Parallel.ForEach(str, new ParallelOptions { MaxDegreeOfParallelism = 5 },
(value, pls, index) =>
{
Console.WriteLine(value);// simulating method call
if (index + 1 == 5)
{
// need the main thread to sleep so next batch is
Thread.Sleep(1000);
}
});
也许:
while(true){
for(int i = 0; i < 5; i++)
Task.Run(() => { <API STUFF> });
Thread.Sleep(1000);
}
我不确定一直这样调用 task.run 是否有效。
如果已经限制为每秒 5 个,并行 运行 有多重要?这是尝试的不同视角(未经编译测试)。这个想法是限制每个,而不是限制一批。
foreach(string value in values)
{
const int alottedMilliseconds = 200;
Stopwatch timer = Stopwatch.StartNew();
// ...
timer.Stop();
int remainingMilliseconds = alottedMilliseconds - timer.ElapsedMilliseconds;
if(remainingMilliseconds > 0)
{
// replace with something more precise/thread friendly as needed.
Thread.Sleep(remainingMilliseconds);
}
}
或者本着你原来的要求。使用将列表分成 5 块的扩展方法扩展您的解决方案...
public static IEnumerable<List<T>> Partition<T>(this IList<T> source, Int32 size)
{
for (int i = 0; i < Math.Ceiling(source.Count / (Double)size); i++)
{
yield return new List<T>(source.Skip(size * i).Take(size));
}
}
利用此扩展在外循环中调用您的 Parallel.ForEach,然后在每个外循环结束时应用相同的计时器方法。像这样...
foreach(IEnumerable<string> batch in str.Partitition(5))
{
Stopwatch timer = Stopwatch.StartNew();
Parallel.ForEach(
batch,
new ParallelOptions { MaxDegreeOfParallelism = 5 },
(value, pls, index) =>
{
Console.WriteLine(value);// simulating method call
});
timer.Stop();
int remainingMilliseconds = 5000 - timer.ElapsedMilliseconds;
if(remainingMilliseconds > 0)
{
// replace with something more precise/thread friendly as needed.
Thread.Sleep(remainingMilliseconds);
}
}
由于您使用的是 .NET 4.0(并且希望您至少使用 VS2012),您可以使用 Microsoft.Bcl.Async
获得 async-await
功能。
一旦你这样做了,你就可以轻松地异步查询你的 API 端点(不需要额外的线程),并使用 AsyncSemaphore
(见下面的实现)来限制要求你同时做。
例如:
public readonly AsyncSemaphore = new AsyncSemaphore(5);
public readonly HttpClient httpClient = new HttpClient();
public async Task<string> LimitedQueryAsync(string url)
{
await semaphoreSlim.WaitAsync();
try
{
var response = await httpClient.GetAsync(url);
return response.Content.ReadAsStringAsync();
}
finally
{
semaphoreSlim.Release();
}
}
现在可以这样查询了:
public async Task DoQueryStuffAsync()
{
while (someCondition)
{
var results = await LimitedQueryAsync(url);
// do stuff with results
await Task.Delay(1000);
}
}
编辑:
正如@ScottChamberlain 正确指出的那样,SemaphoreSlim
在 .NET 4 中不可用。您可以改用 AsyncSemaphore
,如下所示:
public class AsyncSemaphore
{
private readonly static Task s_completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>> m_waiters =
new Queue<TaskCompletionSource<bool>>();
private int m_currentCount;
public AsyncSemaphore(int initialCount)
{
if (initialCount < 0)
{
throw new ArgumentOutOfRangeException("initialCount");
}
m_currentCount = initialCount;
}
public Task WaitAsync()
{
lock (m_waiters)
{
if (m_currentCount > 0)
{
--m_currentCount;
return s_completed;
}
else
{
var waiter = new TaskCompletionSource<bool>();
m_waiters.Enqueue(waiter);
return waiter.Task;
}
}
}
public void Release()
{
TaskCompletionSource<bool> toRelease = null;
lock (m_waiters)
{
if (m_waiters.Count > 0)
toRelease = m_waiters.Dequeue();
else
++m_currentCount;
}
if (toRelease != null)
toRelease.SetResult(true);
}
}
下面给出了两种方法。通过这两种方式,您都将获得所需的测试配置。 不仅代码简洁,而且实现无锁
1) 递归
您必须以每批 5 个请求的形式提出 50 个请求。这意味着以 1 秒的间隔总共有 10 批次的 5 个请求。定义实体,设:
HitAPI()
是调用 API 一次的 线程安全 方法;
InitiateBatch()
是启动一批 5 个线程来命中 API, 的方法
那么,示例实现可以是:
private void InitiateRecursiveHits(int batchCount)
{
return InitiateBatch(batchCount);
}
只要用 batchCount
= 10 调用上面的方法,它就会调用下面的代码..
private void InitiateBatch(int batchNumber)
{
if (batchNumber <= 0) return;
var hitsPerBatch = 5;
var thisBatchHits = new Task[hitsPerBatch];
for (int taskNumber = 1; taskNumber <= hitsPerBatch; taskNumber++)
thisBatchHits[taskNumber - 1] = Task.Run(HitAPI);
Task.WaitAll(thisBatchHits);
Thread.Sleep(1000); //To wait for 1 second before starting another batch of 5
InitiateBatch(batchNumber - 1);
return
}
2) 迭代
这比第一种方法简单。只需以迭代方式执行递归方法...
private void InitiateIterativeHits(int batchCount)
{
if (batchCount <= 0) return;
// It's good programming practice to leave your input variables intact so that
// they hold correct value throughout the execution
int desiredRuns = batchCount;
var hitsPerBatch = 5;
while (desiredRuns-- > 0)
{
var thisBatchHits = new Task[hitsPerBatch];
for (int taskNumber = 1; taskNumber <= hitsPerBatch; taskNumber++)
thisBatchHits[taskNumber - 1] = Task.Run(HitAPI);
Task.WaitAll(thisBatchHits);
Thread.Sleep(1000); //To wait for 1 second before starting another batch of 5
}
}
我会为此使用 Microsoft 的 Reactive Framework (NuGet "Rx-Main")。
这是它的样子:
var query =
Observable
.Range(0, 100)
.Buffer(5)
.Zip(Observable.Interval(TimeSpan.FromSeconds(1.0)), (ns, i) => ns)
.SelectMany(ns =>
ns
.ToObservable()
.SelectMany(n =>
Observable
.Start(() =>
{
/* call here */
Console.WriteLine(n);
return n;
})));
然后您将像这样处理结果:
var subscription =
query
.Subscribe(x =>
{
/* handle result here */
});
如果您需要在请求自然完成之前停止请求,您只需调用 subscription.Dispose();
。
漂亮、干净、明确。
我有一个要求,我可以在一秒钟内击中 API 5 次。如果我必须发出总共 50 个请求,我想发出前 5 个请求并等待 1 秒,然后我可以用另一批 5 个请求点击 API。我尝试使用线程池以及并行任务库 For\Foreach 循环和任务 类 但我无法获得一个顺序计数器来告诉我已经创建了 5 个任务。 这是我正在尝试做的示例:
List<string> str = new List<string>();
for (int i = 0; i <= 100; i++)
{
str.Add(i.ToString());
}
Parallel.ForEach(str, new ParallelOptions { MaxDegreeOfParallelism = 5 },
(value, pls, index) =>
{
Console.WriteLine(value);// simulating method call
if (index + 1 == 5)
{
// need the main thread to sleep so next batch is
Thread.Sleep(1000);
}
});
也许:
while(true){
for(int i = 0; i < 5; i++)
Task.Run(() => { <API STUFF> });
Thread.Sleep(1000);
}
我不确定一直这样调用 task.run 是否有效。
如果已经限制为每秒 5 个,并行 运行 有多重要?这是尝试的不同视角(未经编译测试)。这个想法是限制每个,而不是限制一批。
foreach(string value in values)
{
const int alottedMilliseconds = 200;
Stopwatch timer = Stopwatch.StartNew();
// ...
timer.Stop();
int remainingMilliseconds = alottedMilliseconds - timer.ElapsedMilliseconds;
if(remainingMilliseconds > 0)
{
// replace with something more precise/thread friendly as needed.
Thread.Sleep(remainingMilliseconds);
}
}
或者本着你原来的要求。使用将列表分成 5 块的扩展方法扩展您的解决方案...
public static IEnumerable<List<T>> Partition<T>(this IList<T> source, Int32 size)
{
for (int i = 0; i < Math.Ceiling(source.Count / (Double)size); i++)
{
yield return new List<T>(source.Skip(size * i).Take(size));
}
}
利用此扩展在外循环中调用您的 Parallel.ForEach,然后在每个外循环结束时应用相同的计时器方法。像这样...
foreach(IEnumerable<string> batch in str.Partitition(5))
{
Stopwatch timer = Stopwatch.StartNew();
Parallel.ForEach(
batch,
new ParallelOptions { MaxDegreeOfParallelism = 5 },
(value, pls, index) =>
{
Console.WriteLine(value);// simulating method call
});
timer.Stop();
int remainingMilliseconds = 5000 - timer.ElapsedMilliseconds;
if(remainingMilliseconds > 0)
{
// replace with something more precise/thread friendly as needed.
Thread.Sleep(remainingMilliseconds);
}
}
由于您使用的是 .NET 4.0(并且希望您至少使用 VS2012),您可以使用 Microsoft.Bcl.Async
获得 async-await
功能。
一旦你这样做了,你就可以轻松地异步查询你的 API 端点(不需要额外的线程),并使用 AsyncSemaphore
(见下面的实现)来限制要求你同时做。
例如:
public readonly AsyncSemaphore = new AsyncSemaphore(5);
public readonly HttpClient httpClient = new HttpClient();
public async Task<string> LimitedQueryAsync(string url)
{
await semaphoreSlim.WaitAsync();
try
{
var response = await httpClient.GetAsync(url);
return response.Content.ReadAsStringAsync();
}
finally
{
semaphoreSlim.Release();
}
}
现在可以这样查询了:
public async Task DoQueryStuffAsync()
{
while (someCondition)
{
var results = await LimitedQueryAsync(url);
// do stuff with results
await Task.Delay(1000);
}
}
编辑:
正如@ScottChamberlain 正确指出的那样,SemaphoreSlim
在 .NET 4 中不可用。您可以改用 AsyncSemaphore
,如下所示:
public class AsyncSemaphore
{
private readonly static Task s_completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>> m_waiters =
new Queue<TaskCompletionSource<bool>>();
private int m_currentCount;
public AsyncSemaphore(int initialCount)
{
if (initialCount < 0)
{
throw new ArgumentOutOfRangeException("initialCount");
}
m_currentCount = initialCount;
}
public Task WaitAsync()
{
lock (m_waiters)
{
if (m_currentCount > 0)
{
--m_currentCount;
return s_completed;
}
else
{
var waiter = new TaskCompletionSource<bool>();
m_waiters.Enqueue(waiter);
return waiter.Task;
}
}
}
public void Release()
{
TaskCompletionSource<bool> toRelease = null;
lock (m_waiters)
{
if (m_waiters.Count > 0)
toRelease = m_waiters.Dequeue();
else
++m_currentCount;
}
if (toRelease != null)
toRelease.SetResult(true);
}
}
下面给出了两种方法。通过这两种方式,您都将获得所需的测试配置。 不仅代码简洁,而且实现无锁
1) 递归
您必须以每批 5 个请求的形式提出 50 个请求。这意味着以 1 秒的间隔总共有 10 批次的 5 个请求。定义实体,设:
HitAPI()
是调用 API 一次的 线程安全 方法;InitiateBatch()
是启动一批 5 个线程来命中 API, 的方法
那么,示例实现可以是:
private void InitiateRecursiveHits(int batchCount)
{
return InitiateBatch(batchCount);
}
只要用 batchCount
= 10 调用上面的方法,它就会调用下面的代码..
private void InitiateBatch(int batchNumber)
{
if (batchNumber <= 0) return;
var hitsPerBatch = 5;
var thisBatchHits = new Task[hitsPerBatch];
for (int taskNumber = 1; taskNumber <= hitsPerBatch; taskNumber++)
thisBatchHits[taskNumber - 1] = Task.Run(HitAPI);
Task.WaitAll(thisBatchHits);
Thread.Sleep(1000); //To wait for 1 second before starting another batch of 5
InitiateBatch(batchNumber - 1);
return
}
2) 迭代
这比第一种方法简单。只需以迭代方式执行递归方法...
private void InitiateIterativeHits(int batchCount)
{
if (batchCount <= 0) return;
// It's good programming practice to leave your input variables intact so that
// they hold correct value throughout the execution
int desiredRuns = batchCount;
var hitsPerBatch = 5;
while (desiredRuns-- > 0)
{
var thisBatchHits = new Task[hitsPerBatch];
for (int taskNumber = 1; taskNumber <= hitsPerBatch; taskNumber++)
thisBatchHits[taskNumber - 1] = Task.Run(HitAPI);
Task.WaitAll(thisBatchHits);
Thread.Sleep(1000); //To wait for 1 second before starting another batch of 5
}
}
我会为此使用 Microsoft 的 Reactive Framework (NuGet "Rx-Main")。
这是它的样子:
var query =
Observable
.Range(0, 100)
.Buffer(5)
.Zip(Observable.Interval(TimeSpan.FromSeconds(1.0)), (ns, i) => ns)
.SelectMany(ns =>
ns
.ToObservable()
.SelectMany(n =>
Observable
.Start(() =>
{
/* call here */
Console.WriteLine(n);
return n;
})));
然后您将像这样处理结果:
var subscription =
query
.Subscribe(x =>
{
/* handle result here */
});
如果您需要在请求自然完成之前停止请求,您只需调用 subscription.Dispose();
。
漂亮、干净、明确。