如何在 C# 中进行并发 API 调用?
How to make concurrent API calls in C#?
我需要从抵押中提取自定义字段 API。
问题是总共有 11000 条记录,每个 API 请求需要 1 秒。我想找到一种异步和并行发送请求的方法,以提高效率。
我尝试遍历所有请求,然后让 Task.WaitAll()
等待对 return 的响应。我只收到两个响应,然后应用程序无限期地等待。
我首先为HttpClient
设置了静态class
public static class ApiHelper
{
public static HttpClient ApiClient { get; set; }
public static void InitializeClient()
{
ApiClient = new HttpClient();
ApiClient.DefaultRequestHeaders.Add("ContentType", "application/json");
}
}
我收集我的抵押贷款 ID 列表并遍历 API Post 调用
static public DataTable GetCustomFields(DataTable dt, List<string> cf, string auth)
{
//set auth header
ApiHelper.ApiClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", auth);
//format body
string jsonBody = JArray.FromObject(cf).ToString();
var content = new StringContent(jsonBody, Encoding.UTF8, "application/json");
var responses = new List<Task<string>>();
foreach (DataRow dr in dt.Rows)
{
string guid = dr["GUID"].ToString().Replace("{", "").Replace("}", ""); //remove {} from string
responses.Add(GetData(guid, content));
}
Task.WaitAll(responses.ToArray());
//some code here to process through the responses and return a datatable
return updatedDT;
}
每个 API 调用都需要 URL
中的抵押 ID (GUID)
async static Task<string> GetData(string guid, StringContent json)
{
string url = "https://api.elliemae.com/encompass/v1/loans/" + guid + "/fieldReader";
Console.WriteLine("{0} has started .....", guid);
using (HttpResponseMessage response = await ApiHelper.ApiClient.PostAsync(url, json))
{
if (response.IsSuccessStatusCode)
{
Console.WriteLine("{0} has returned response....", guid);
return await response.Content.ReadAsStringAsync();
}
else
{
Console.WriteLine(response.ReasonPhrase);
throw new Exception(response.ReasonPhrase);
}
}
}
我现在只测试 10 条记录并发送所有 10 个请求。
但我只收到两个回来。
结果为 here。
能否就发送并发 API 调用的正确方法向我提供建议?
所有 GetData
Task
都在使用相同的 HttpClient
单例实例。 HttpClient 不能同时为多个调用提供服务。最佳做法是使用 Pool
的 HttpClient 来确保没有任务同时访问同一个 HttpClient。
此外,小心在 Task 中抛出 exception
,它会在第一次抛出异常时停止 WaitAll()
解决方案 我已经 post 在这里编辑了整个项目:https://github.com/jonathanlarouche/Whosebug_58137212
此解决方案使用 [3] 的 max sized
池发送 25 个请求;
基本上,ApiHelper
包含一个 HttpClient
pool,使用通用 class ArrayPool<T>
。 您可以使用任何其他 Pooling 库,我只是想 post 一个 self-contained 解决方案.
建议的 ApiHelper 下面,这个 class 现在包含一个池和一个 Use
方法,它接收一个 Action
,一个来自在操作期间,池将是 "rented",然后它将通过 ArrayPool.Use
函数返回到池中。 Use
函数还接收 apiToken 以更改请求 Auth Header。
public static class ApiHelper
{
public static int PoolSize { get => apiClientPool.Size; }
private static ArrayPool<HttpClient> apiClientPool = new ArrayPool<HttpClient>(() => {
var apiClient = new HttpClient();
apiClient.DefaultRequestHeaders.Add("ContentType", "application/json");
return apiClient;
});
public static Task Use(string apiToken, Func<HttpClient, Task> action)
{
return apiClientPool.Use(client => {
client.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", apiToken);
return action(client);
});
}
}
GetData 函数。获取数据将接收 apiToken 并将等待 ApiHelper.Use
函数。 StringContent()
object 的新实例需要在此函数中完成,因为它不能在不同的 Http Post 调用中重复使用。
async static Task<string> GetData(string apiToken, Guid guid, string jsonBody)
{
string url = "https://api.elliemae.com/encompass/v1/loans/" + guid + "/fieldReader";
Console.WriteLine("{0} has started .....", guid);
string output = null;
await ApiHelper.Use(apiToken, (client) =>
{
var json = new StringContent(jsonBody, Encoding.UTF8, "application/json");
return client.PostAsync(url, json).ContinueWith(postTaskResult =>
{
return postTaskResult.Result.Content.ReadAsStringAsync().ContinueWith(s => {
output = s.Result;
return s;
});
});
});
Console.WriteLine("{0} has finished .....", guid);
return output;
}
数组池
public class ArrayPool<T>
{
public int Size { get => pool.Count(); }
public int maxSize = 3;
public int circulingObjectCount = 0;
private Queue<T> pool = new Queue<T>();
private Func<T> constructorFunc;
public ArrayPool(Func<T> constructorFunc) {
this.constructorFunc = constructorFunc;
}
public Task Use(Func<T, Task> action)
{
T item = GetNextItem(); //DeQueue the item
var t = action(item);
t.ContinueWith(task => pool.Enqueue(item)); //Requeue the item
return t;
}
private T GetNextItem()
{
//Create new object if pool is empty and not reached maxSize
if (pool.Count == 0 && circulingObjectCount < maxSize)
{
T item = constructorFunc();
circulingObjectCount++;
Console.WriteLine("Pool empty, adding new item");
return item;
}
//Wait for Queue to have at least 1 item
WaitForReturns();
return pool.Dequeue();
}
private void WaitForReturns()
{
long timeouts = 60000;
while (pool.Count == 0 && timeouts > 0) { timeouts--; System.Threading.Thread.Sleep(1); }
if(timeouts == 0)
{
throw new Exception("Wait timed-out");
}
}
}
我需要从抵押中提取自定义字段 API。 问题是总共有 11000 条记录,每个 API 请求需要 1 秒。我想找到一种异步和并行发送请求的方法,以提高效率。
我尝试遍历所有请求,然后让 Task.WaitAll()
等待对 return 的响应。我只收到两个响应,然后应用程序无限期地等待。
我首先为HttpClient
public static class ApiHelper
{
public static HttpClient ApiClient { get; set; }
public static void InitializeClient()
{
ApiClient = new HttpClient();
ApiClient.DefaultRequestHeaders.Add("ContentType", "application/json");
}
}
我收集我的抵押贷款 ID 列表并遍历 API Post 调用
static public DataTable GetCustomFields(DataTable dt, List<string> cf, string auth)
{
//set auth header
ApiHelper.ApiClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", auth);
//format body
string jsonBody = JArray.FromObject(cf).ToString();
var content = new StringContent(jsonBody, Encoding.UTF8, "application/json");
var responses = new List<Task<string>>();
foreach (DataRow dr in dt.Rows)
{
string guid = dr["GUID"].ToString().Replace("{", "").Replace("}", ""); //remove {} from string
responses.Add(GetData(guid, content));
}
Task.WaitAll(responses.ToArray());
//some code here to process through the responses and return a datatable
return updatedDT;
}
每个 API 调用都需要 URL
中的抵押 ID (GUID) async static Task<string> GetData(string guid, StringContent json)
{
string url = "https://api.elliemae.com/encompass/v1/loans/" + guid + "/fieldReader";
Console.WriteLine("{0} has started .....", guid);
using (HttpResponseMessage response = await ApiHelper.ApiClient.PostAsync(url, json))
{
if (response.IsSuccessStatusCode)
{
Console.WriteLine("{0} has returned response....", guid);
return await response.Content.ReadAsStringAsync();
}
else
{
Console.WriteLine(response.ReasonPhrase);
throw new Exception(response.ReasonPhrase);
}
}
}
我现在只测试 10 条记录并发送所有 10 个请求。 但我只收到两个回来。
结果为 here。
能否就发送并发 API 调用的正确方法向我提供建议?
所有 GetData
Task
都在使用相同的 HttpClient
单例实例。 HttpClient 不能同时为多个调用提供服务。最佳做法是使用 Pool
的 HttpClient 来确保没有任务同时访问同一个 HttpClient。
此外,小心在 Task 中抛出 exception
,它会在第一次抛出异常时停止 WaitAll()
解决方案 我已经 post 在这里编辑了整个项目:https://github.com/jonathanlarouche/Whosebug_58137212
此解决方案使用 [3] 的 max sized
池发送 25 个请求;
基本上,ApiHelper
包含一个 HttpClient
pool,使用通用 class ArrayPool<T>
。 您可以使用任何其他 Pooling 库,我只是想 post 一个 self-contained 解决方案.
建议的 ApiHelper 下面,这个 class 现在包含一个池和一个 Use
方法,它接收一个 Action
,一个来自在操作期间,池将是 "rented",然后它将通过 ArrayPool.Use
函数返回到池中。 Use
函数还接收 apiToken 以更改请求 Auth Header。
public static class ApiHelper
{
public static int PoolSize { get => apiClientPool.Size; }
private static ArrayPool<HttpClient> apiClientPool = new ArrayPool<HttpClient>(() => {
var apiClient = new HttpClient();
apiClient.DefaultRequestHeaders.Add("ContentType", "application/json");
return apiClient;
});
public static Task Use(string apiToken, Func<HttpClient, Task> action)
{
return apiClientPool.Use(client => {
client.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", apiToken);
return action(client);
});
}
}
GetData 函数。获取数据将接收 apiToken 并将等待 ApiHelper.Use
函数。 StringContent()
object 的新实例需要在此函数中完成,因为它不能在不同的 Http Post 调用中重复使用。
async static Task<string> GetData(string apiToken, Guid guid, string jsonBody)
{
string url = "https://api.elliemae.com/encompass/v1/loans/" + guid + "/fieldReader";
Console.WriteLine("{0} has started .....", guid);
string output = null;
await ApiHelper.Use(apiToken, (client) =>
{
var json = new StringContent(jsonBody, Encoding.UTF8, "application/json");
return client.PostAsync(url, json).ContinueWith(postTaskResult =>
{
return postTaskResult.Result.Content.ReadAsStringAsync().ContinueWith(s => {
output = s.Result;
return s;
});
});
});
Console.WriteLine("{0} has finished .....", guid);
return output;
}
数组池
public class ArrayPool<T>
{
public int Size { get => pool.Count(); }
public int maxSize = 3;
public int circulingObjectCount = 0;
private Queue<T> pool = new Queue<T>();
private Func<T> constructorFunc;
public ArrayPool(Func<T> constructorFunc) {
this.constructorFunc = constructorFunc;
}
public Task Use(Func<T, Task> action)
{
T item = GetNextItem(); //DeQueue the item
var t = action(item);
t.ContinueWith(task => pool.Enqueue(item)); //Requeue the item
return t;
}
private T GetNextItem()
{
//Create new object if pool is empty and not reached maxSize
if (pool.Count == 0 && circulingObjectCount < maxSize)
{
T item = constructorFunc();
circulingObjectCount++;
Console.WriteLine("Pool empty, adding new item");
return item;
}
//Wait for Queue to have at least 1 item
WaitForReturns();
return pool.Dequeue();
}
private void WaitForReturns()
{
long timeouts = 60000;
while (pool.Count == 0 && timeouts > 0) { timeouts--; System.Threading.Thread.Sleep(1); }
if(timeouts == 0)
{
throw new Exception("Wait timed-out");
}
}
}