使用循环限制并发异步请求
Throttling concurrent async requests with loop
我目前正在努力向网络发出大量请求 API。我已经尝试 async
这个过程,以便我可以在合理的时间内完成,但是我无法限制连接,所以我发送的不会超过 10
requests/second.我正在使用信号量进行节流,但我不完全确定它在这种情况下如何工作,因为我有一个嵌套循环。
我实际上是在获取模型列表,每个模型中都有一个天数列表。我需要为模型中的每一天提出请求。天数可以是 1
到 50
之间的任何时间,99%
的时间只会是 1
。所以我想 async
每个模型,因为会有大约 3000
个模型,但我想 async
天,以防需要完成多天。我需要保持在 10
requests/second 或以下,所以我认为最好的方法是将整个操作的请求限制设置为 10。有没有一个地方可以放置信号量来限制整个链的连接?
每个单独的请求还必须对 2
不同的数据发出两次请求,并且此 API 目前不支持任何类型的批处理。
我是 c# 的新手,async
的新手和 WebRequests/HttpClient
的新手,因此非常感谢您的帮助。我试图在这里添加所有相关代码。如果您还需要什么,请告诉我。
public static async Task GetWeatherDataAsync(List<Model> models)
{
SemaphoreSlim semaphore = new SemaphoreSlim(10);
var taskList = new List<Task<ComparisonModel>>();
foreach (var x in models)
{
await semaphore.WaitAsync();
taskList.Add(CompDaysAsync(x));
}
try
{
await Task.WhenAll(taskList.ToArray());
}
catch (Exception e) { }
finally
{
semaphore.Release();
}
}
public static async Task<Models> CompDaysAsync(Model model)
{
var httpClient = new HttpClient();
httpClient.DefaultRequestHeaders.Authorization = new
Headers.AuthenticationHeaderValue("Token","xxxxxxxx");
httpClient.Timeout = TimeSpan.FromMinutes(5);
var taskList = new List<Task<Models.DateTemp>>();
foreach (var item in model.list)
{
taskList.Add(WeatherAPI.GetResponseForDayAsync(item,
httpClient, Latitude, Longitude));
}
httpClient.Dispose();
try
{
await Task.WhenAll(taskList.ToArray());
}
catch (Exception e) { }
return model;
}
public static async Task<DateTemp> GetResponseForDayAsync(DateTemp date, HttpClient httpClient, decimal? Latitude, decimal? Longitude)
{
var response = await httpClient.GetStreamAsync(request1);
StreamReader myStreamReader = new StreamReader(response);
string responseData = myStreamReader.ReadToEnd();
double[] data = new double[2];
if (responseData != "[[null, null]]")
{
data = Array.ConvertAll(responseData.Replace("[", "").Replace("]", "").Split(','), double.Parse);
}
else { data = null; };
double precipData = 0;
var response2 = await httpClient.GetStreamAsync(request2);
StreamReader myStreamReader2 = new StreamReader(response2);
string responseData2 = myStreamReader2.ReadToEnd();
if (responseData2 != null && responseData2 != "[null]" && responseData2 != "[0.0]")
{
precipData = double.Parse(responseData2.Replace("[", "").Replace("]", ""));
}
date.Precip = precipData;
if (data != null)
{
date.minTemp = data[0];
date.maxTemp = data[1];
}
return date;
}
我认为您完全不了解 SemaphoreSlim
的作用。
- 您的信号量是基于方法级别的局部变量,因此 每个
GetWeatherDataAsync
方法调用都会产生 10
调用您的 API , 无需等待其他客户端。
此外,如果 models.Count > 10
,您的代码将死锁,因为您在每次迭代中等待信号量,这些请求正在堆叠,对于 11th
,您的线程将挂起永远,因为你没有释放信号量:
var semaphore = new SemaphoreSlim(10);
foreach (var item in Enumerable.Range(0, 15))
{
// will stop after 9
await semaphore.WaitAsync();
Console.WriteLine(item);
}
你真正需要做的是将信号量移动到实例级别(或者甚至是带有static
关键字的类型级别),然后在里面GetWeatherDataAsync
,然后将 Release
放在 finally
块中。
至于 Parallel.Foreach
- 你不应该在这种情况下使用它,因为它不知道 async
方法(它是在 async/await
之前引入的),并且你的方法不看起来他们 CPU-bound.
我目前正在努力向网络发出大量请求 API。我已经尝试 async
这个过程,以便我可以在合理的时间内完成,但是我无法限制连接,所以我发送的不会超过 10
requests/second.我正在使用信号量进行节流,但我不完全确定它在这种情况下如何工作,因为我有一个嵌套循环。
我实际上是在获取模型列表,每个模型中都有一个天数列表。我需要为模型中的每一天提出请求。天数可以是 1
到 50
之间的任何时间,99%
的时间只会是 1
。所以我想 async
每个模型,因为会有大约 3000
个模型,但我想 async
天,以防需要完成多天。我需要保持在 10
requests/second 或以下,所以我认为最好的方法是将整个操作的请求限制设置为 10。有没有一个地方可以放置信号量来限制整个链的连接?
每个单独的请求还必须对 2
不同的数据发出两次请求,并且此 API 目前不支持任何类型的批处理。
我是 c# 的新手,async
的新手和 WebRequests/HttpClient
的新手,因此非常感谢您的帮助。我试图在这里添加所有相关代码。如果您还需要什么,请告诉我。
public static async Task GetWeatherDataAsync(List<Model> models)
{
SemaphoreSlim semaphore = new SemaphoreSlim(10);
var taskList = new List<Task<ComparisonModel>>();
foreach (var x in models)
{
await semaphore.WaitAsync();
taskList.Add(CompDaysAsync(x));
}
try
{
await Task.WhenAll(taskList.ToArray());
}
catch (Exception e) { }
finally
{
semaphore.Release();
}
}
public static async Task<Models> CompDaysAsync(Model model)
{
var httpClient = new HttpClient();
httpClient.DefaultRequestHeaders.Authorization = new
Headers.AuthenticationHeaderValue("Token","xxxxxxxx");
httpClient.Timeout = TimeSpan.FromMinutes(5);
var taskList = new List<Task<Models.DateTemp>>();
foreach (var item in model.list)
{
taskList.Add(WeatherAPI.GetResponseForDayAsync(item,
httpClient, Latitude, Longitude));
}
httpClient.Dispose();
try
{
await Task.WhenAll(taskList.ToArray());
}
catch (Exception e) { }
return model;
}
public static async Task<DateTemp> GetResponseForDayAsync(DateTemp date, HttpClient httpClient, decimal? Latitude, decimal? Longitude)
{
var response = await httpClient.GetStreamAsync(request1);
StreamReader myStreamReader = new StreamReader(response);
string responseData = myStreamReader.ReadToEnd();
double[] data = new double[2];
if (responseData != "[[null, null]]")
{
data = Array.ConvertAll(responseData.Replace("[", "").Replace("]", "").Split(','), double.Parse);
}
else { data = null; };
double precipData = 0;
var response2 = await httpClient.GetStreamAsync(request2);
StreamReader myStreamReader2 = new StreamReader(response2);
string responseData2 = myStreamReader2.ReadToEnd();
if (responseData2 != null && responseData2 != "[null]" && responseData2 != "[0.0]")
{
precipData = double.Parse(responseData2.Replace("[", "").Replace("]", ""));
}
date.Precip = precipData;
if (data != null)
{
date.minTemp = data[0];
date.maxTemp = data[1];
}
return date;
}
我认为您完全不了解 SemaphoreSlim
的作用。
- 您的信号量是基于方法级别的局部变量,因此 每个
GetWeatherDataAsync
方法调用都会产生10
调用您的 API , 无需等待其他客户端。 此外,如果
models.Count > 10
,您的代码将死锁,因为您在每次迭代中等待信号量,这些请求正在堆叠,对于11th
,您的线程将挂起永远,因为你没有释放信号量:var semaphore = new SemaphoreSlim(10); foreach (var item in Enumerable.Range(0, 15)) { // will stop after 9 await semaphore.WaitAsync(); Console.WriteLine(item); }
你真正需要做的是将信号量移动到实例级别(或者甚至是带有static
关键字的类型级别),然后在里面GetWeatherDataAsync
,然后将 Release
放在 finally
块中。
至于 Parallel.Foreach
- 你不应该在这种情况下使用它,因为它不知道 async
方法(它是在 async/await
之前引入的),并且你的方法不看起来他们 CPU-bound.