并行分发请求

Distributing Requests In Parallel

我喜欢那个场景:

  1. 我有一个端点,这个端点会将请求保存在内存中的列表或队列中,它会return立即成功响应给消费者。这个要求很关键,消费者不应该等待响应,如果需要,它会从不同的端点获得响应。因此,此端点必须在将请求消息保存在内存中后尽快return。

  2. 另一个线程会将这些请求分发到其他端点并将响应也保存在内存中。

到目前为止我做了什么:

我创建了一个控制器api来将这些请求保存在内存中。我将它们保存在如下静态请求列表中:

    public static class RequestList
    {
        public static event EventHandler<RequestEventArgs> RequestReceived;

        private static List<DistributionRequest> Requests { get; set; } = new List<DistributionRequest>();

        public static int RequestCount { get => RequestList.Requests.Count; }

        public static DistributionRequest Add(DistributionRequest request)
        {
            request.RequestId = Guid.NewGuid().ToString();
            RequestList.Requests.Add(request);
            OnRequestReceived(new RequestEventArgs { Request = request });
            return request;
        }

        public static bool Remove(DistributionRequest request) => Requests.Remove(request);

        private static void OnRequestReceived(RequestEventArgs e)
        {
            RequestReceived?.Invoke(null, e);
        }
    }

    public class RequestEventArgs : EventArgs
    {
        public DistributionRequest Request { get; set; }
    }

另一个 class 订阅了该静态 class 中存在的那个事件,我正在创建一个新线程来发出一些后台 Web 请求,以便能够实现 2. 我的项目如上所述。

        private void RequestList_RequestReceived(object sender, RequestEventArgs e)
        {
            _logger.LogInformation($"Request Id: {e.Request.RequestId}, New request received");
            Task.Factory.StartNew(() => Distribute(e.Request));
            _logger.LogInformation($"Request Id: {e.Request.RequestId}, New task created for the new request");
            //await Distribute(e.Request);
        }

        public async Task<bool> Distribute(DistributionRequest request)
        {

            //Some logic running here to send post request to different endpoints 
            //and to save results in memory
        }

这是我的控制器方法:

        [HttpPost]
        public IActionResult Post([FromForm] DistributionRequest request)
        {
            var response = RequestList.Add(request);
            return Ok(new DistributionResponse { Succeeded = true, RequestId = response.RequestId });
        }

我尝试了这种方法,但它没有像我预期的那样工作,它应该在毫秒内 return 因为我不是在等待响应,但它似乎在等待什么,并且在每个请求之后等待时间是递增如下:

我做错了什么?或者你有更好的主意吗?我怎样才能实现我的目标?

根据您的示例代码,我尝试在不使用“事件”的情况下实现它。因此我得到了更好的请求时间。我不能说这是否与您的实施或事件本身有关,为此您必须进行分析。

我是这样做的

请求控制器

就像您在示例中所做的那样。接受请求并将其添加到请求列表中。

[Route("requests")]
    public class RequestsController : ControllerBase
    {
        private readonly RequestManager _mgr;

        public RequestsController(RequestManager mgr)
        {
            _mgr = mgr;
        }

        [HttpPost]
        public IActionResult AddRequest([FromBody] DistributionRequest request)
        {
            var item = _mgr.Add(request);
            return Accepted(new { Succeeded = true, RequestId = item.RequestId });
        }
    }

请求管理器

管理请求列表并将它们转发给某个分销商。

public class RequestManager
    {
        private readonly ILogger _logger;
        private readonly RequestDistributor _distributor;

        public IList<DistributionRequest> Requests { get; } = new List<DistributionRequest>();

        public RequestManager(RequestDistributor distributor, ILogger<RequestManager> logger)
        {
            _distributor = distributor;
            _logger = logger;
        }

        public DistributionRequest Add(DistributionRequest request)
        {
            _logger.LogInformation($"Request Id: {request.RequestId}, New request received");
            /// Just add to the list of requests
            Requests.Add(request);
            /// Create and start a new task to distribute the request 
            /// forward it to the distributor.
            /// Be sure to not add "await" here
            Task.Factory.StartNew(() => _distributor.DistributeAsync(request));
            _logger.LogInformation($"Request Id: {request.RequestId}, New task created for the new request");

            return request;
        }
    }

RequestDistributor

分发逻辑可以在这里实现

public class RequestDistributor
    {
        public async Task DistributeAsync(DistributionRequest request)
        {
            /// do your distribution here
            /// currently just a mocked time range
            await Task.Delay(5);
        }
    }

接线

...将所有这些添加到您的依赖注入配置中

public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            services.AddSingleton<RequestDistributor>();
            services.AddSingleton<RequestManager>();
        }

测试

使用此处提供的代码片段,我在不到 10 毫秒的时间内收到了所有请求。

备注

这只是一个示例,尝试始终向您的服务添加接口以使其可测试 ;)。