并行分发请求
Distributing Requests In Parallel
我喜欢那个场景:
我有一个端点,这个端点会将请求保存在内存中的列表或队列中,它会return立即成功响应给消费者。这个要求很关键,消费者不应该等待响应,如果需要,它会从不同的端点获得响应。因此,此端点必须在将请求消息保存在内存中后尽快return。
另一个线程会将这些请求分发到其他端点并将响应也保存在内存中。
到目前为止我做了什么:
我创建了一个控制器api来将这些请求保存在内存中。我将它们保存在如下静态请求列表中:
public static class RequestList
{
public static event EventHandler<RequestEventArgs> RequestReceived;
private static List<DistributionRequest> Requests { get; set; } = new List<DistributionRequest>();
public static int RequestCount { get => RequestList.Requests.Count; }
public static DistributionRequest Add(DistributionRequest request)
{
request.RequestId = Guid.NewGuid().ToString();
RequestList.Requests.Add(request);
OnRequestReceived(new RequestEventArgs { Request = request });
return request;
}
public static bool Remove(DistributionRequest request) => Requests.Remove(request);
private static void OnRequestReceived(RequestEventArgs e)
{
RequestReceived?.Invoke(null, e);
}
}
public class RequestEventArgs : EventArgs
{
public DistributionRequest Request { get; set; }
}
另一个 class 订阅了该静态 class 中存在的那个事件,我正在创建一个新线程来发出一些后台 Web 请求,以便能够实现 2. 我的项目如上所述。
private void RequestList_RequestReceived(object sender, RequestEventArgs e)
{
_logger.LogInformation($"Request Id: {e.Request.RequestId}, New request received");
Task.Factory.StartNew(() => Distribute(e.Request));
_logger.LogInformation($"Request Id: {e.Request.RequestId}, New task created for the new request");
//await Distribute(e.Request);
}
public async Task<bool> Distribute(DistributionRequest request)
{
//Some logic running here to send post request to different endpoints
//and to save results in memory
}
这是我的控制器方法:
[HttpPost]
public IActionResult Post([FromForm] DistributionRequest request)
{
var response = RequestList.Add(request);
return Ok(new DistributionResponse { Succeeded = true, RequestId = response.RequestId });
}
我尝试了这种方法,但它没有像我预期的那样工作,它应该在毫秒内 return 因为我不是在等待响应,但它似乎在等待什么,并且在每个请求之后等待时间是递增如下:
我做错了什么?或者你有更好的主意吗?我怎样才能实现我的目标?
根据您的示例代码,我尝试在不使用“事件”的情况下实现它。因此我得到了更好的请求时间。我不能说这是否与您的实施或事件本身有关,为此您必须进行分析。
我是这样做的
请求控制器
就像您在示例中所做的那样。接受请求并将其添加到请求列表中。
[Route("requests")]
public class RequestsController : ControllerBase
{
private readonly RequestManager _mgr;
public RequestsController(RequestManager mgr)
{
_mgr = mgr;
}
[HttpPost]
public IActionResult AddRequest([FromBody] DistributionRequest request)
{
var item = _mgr.Add(request);
return Accepted(new { Succeeded = true, RequestId = item.RequestId });
}
}
请求管理器
管理请求列表并将它们转发给某个分销商。
public class RequestManager
{
private readonly ILogger _logger;
private readonly RequestDistributor _distributor;
public IList<DistributionRequest> Requests { get; } = new List<DistributionRequest>();
public RequestManager(RequestDistributor distributor, ILogger<RequestManager> logger)
{
_distributor = distributor;
_logger = logger;
}
public DistributionRequest Add(DistributionRequest request)
{
_logger.LogInformation($"Request Id: {request.RequestId}, New request received");
/// Just add to the list of requests
Requests.Add(request);
/// Create and start a new task to distribute the request
/// forward it to the distributor.
/// Be sure to not add "await" here
Task.Factory.StartNew(() => _distributor.DistributeAsync(request));
_logger.LogInformation($"Request Id: {request.RequestId}, New task created for the new request");
return request;
}
}
RequestDistributor
分发逻辑可以在这里实现
public class RequestDistributor
{
public async Task DistributeAsync(DistributionRequest request)
{
/// do your distribution here
/// currently just a mocked time range
await Task.Delay(5);
}
}
接线
...将所有这些添加到您的依赖注入配置中
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddSingleton<RequestDistributor>();
services.AddSingleton<RequestManager>();
}
测试
使用此处提供的代码片段,我在不到 10 毫秒的时间内收到了所有请求。
备注
这只是一个示例,尝试始终向您的服务添加接口以使其可测试 ;)。
我喜欢那个场景:
我有一个端点,这个端点会将请求保存在内存中的列表或队列中,它会return立即成功响应给消费者。这个要求很关键,消费者不应该等待响应,如果需要,它会从不同的端点获得响应。因此,此端点必须在将请求消息保存在内存中后尽快return。
另一个线程会将这些请求分发到其他端点并将响应也保存在内存中。
到目前为止我做了什么:
我创建了一个控制器api来将这些请求保存在内存中。我将它们保存在如下静态请求列表中:
public static class RequestList
{
public static event EventHandler<RequestEventArgs> RequestReceived;
private static List<DistributionRequest> Requests { get; set; } = new List<DistributionRequest>();
public static int RequestCount { get => RequestList.Requests.Count; }
public static DistributionRequest Add(DistributionRequest request)
{
request.RequestId = Guid.NewGuid().ToString();
RequestList.Requests.Add(request);
OnRequestReceived(new RequestEventArgs { Request = request });
return request;
}
public static bool Remove(DistributionRequest request) => Requests.Remove(request);
private static void OnRequestReceived(RequestEventArgs e)
{
RequestReceived?.Invoke(null, e);
}
}
public class RequestEventArgs : EventArgs
{
public DistributionRequest Request { get; set; }
}
另一个 class 订阅了该静态 class 中存在的那个事件,我正在创建一个新线程来发出一些后台 Web 请求,以便能够实现 2. 我的项目如上所述。
private void RequestList_RequestReceived(object sender, RequestEventArgs e)
{
_logger.LogInformation($"Request Id: {e.Request.RequestId}, New request received");
Task.Factory.StartNew(() => Distribute(e.Request));
_logger.LogInformation($"Request Id: {e.Request.RequestId}, New task created for the new request");
//await Distribute(e.Request);
}
public async Task<bool> Distribute(DistributionRequest request)
{
//Some logic running here to send post request to different endpoints
//and to save results in memory
}
这是我的控制器方法:
[HttpPost]
public IActionResult Post([FromForm] DistributionRequest request)
{
var response = RequestList.Add(request);
return Ok(new DistributionResponse { Succeeded = true, RequestId = response.RequestId });
}
我尝试了这种方法,但它没有像我预期的那样工作,它应该在毫秒内 return 因为我不是在等待响应,但它似乎在等待什么,并且在每个请求之后等待时间是递增如下:
我做错了什么?或者你有更好的主意吗?我怎样才能实现我的目标?
根据您的示例代码,我尝试在不使用“事件”的情况下实现它。因此我得到了更好的请求时间。我不能说这是否与您的实施或事件本身有关,为此您必须进行分析。
我是这样做的
请求控制器
就像您在示例中所做的那样。接受请求并将其添加到请求列表中。
[Route("requests")]
public class RequestsController : ControllerBase
{
private readonly RequestManager _mgr;
public RequestsController(RequestManager mgr)
{
_mgr = mgr;
}
[HttpPost]
public IActionResult AddRequest([FromBody] DistributionRequest request)
{
var item = _mgr.Add(request);
return Accepted(new { Succeeded = true, RequestId = item.RequestId });
}
}
请求管理器
管理请求列表并将它们转发给某个分销商。
public class RequestManager
{
private readonly ILogger _logger;
private readonly RequestDistributor _distributor;
public IList<DistributionRequest> Requests { get; } = new List<DistributionRequest>();
public RequestManager(RequestDistributor distributor, ILogger<RequestManager> logger)
{
_distributor = distributor;
_logger = logger;
}
public DistributionRequest Add(DistributionRequest request)
{
_logger.LogInformation($"Request Id: {request.RequestId}, New request received");
/// Just add to the list of requests
Requests.Add(request);
/// Create and start a new task to distribute the request
/// forward it to the distributor.
/// Be sure to not add "await" here
Task.Factory.StartNew(() => _distributor.DistributeAsync(request));
_logger.LogInformation($"Request Id: {request.RequestId}, New task created for the new request");
return request;
}
}
RequestDistributor
分发逻辑可以在这里实现
public class RequestDistributor
{
public async Task DistributeAsync(DistributionRequest request)
{
/// do your distribution here
/// currently just a mocked time range
await Task.Delay(5);
}
}
接线
...将所有这些添加到您的依赖注入配置中
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddSingleton<RequestDistributor>();
services.AddSingleton<RequestManager>();
}
测试
使用此处提供的代码片段,我在不到 10 毫秒的时间内收到了所有请求。
备注
这只是一个示例,尝试始终向您的服务添加接口以使其可测试 ;)。