为基于排队计时器的 HTTP 调用创建异步方法

Create async method for queued timer based HTTP call

我有一个 class,它包含一个请求队列,这些请求将被收集并在最大 1 秒的时间间隔后通过 HTTP 调用发送到 Web API:

    public class AsyncValueTimerIntervalWriter
    {
        private class ValueRequest
        {
            public string FullName { get; set; }
            public object Value { get; set; }
        }

        private readonly IValuesClient _valuesClient; // auto generated Swagger HTTP client

        private List<ValueRequest> _valueRequests = new List<ValueRequest>();
        private object _valuesLock = new object();

        private Timer _timer;

        public AsyncValueTimerIntervalWriter(IValuesClient valuesClient)
        {
            _valuesClient = valuesClient;
        }

        public void Start() 
        {
            _timer = new Timer(o => WriteValuesToServer(), null, 0, 1000);
        }

        public void Stop() 
        {
            _timer?.Dispose();
            _timer = null;
        }

        public void AddWrite(string fullName, object value)
        {
            lock (_valuesLock)
            {
                _valueRequests.Add(new ValueRequest { FullName = fullName, Value = value });
            }
        }

        private async void WriteValuesToServer()
        {
            IList<ValueRequest> values;

            lock (_valuesLock)
            {
                values = _valueRequests.ToArray();
                _valueRequests.Clear();
            }

            if (values.Any())
            {
                await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
            }
        }
    }

来电示例:

var asyncWriter = new AsyncValueTimerIntervalWriter(...);
asyncWriter.AddWrite("My.Var.Tree.VarName", 1234);
asyncWriter.AddWrite("My.Var.Tree.AnotherVar", "Test");

// after max 1 sec the values are written to server

我的目标是写一个async方法,里面也加了一个值来写,returns写值的时候:

await asyncWriter.WriteAsync("My.Var.Tree.VarName", 1234);
// should continue after written to server

重要提示:我需要在队列中处理请求,因为writer随时可能停止,不允许丢失请求。再次启动写入器后,需要将添加的请求发送到服务器。

我尝试使用ManualResetEvent,但感觉很奇怪:

...
public Task WriteAsync(string fullName, object value)
{
    var resetEvent = new ManualResetEvent(false);

    lock (_valuesLock)
    {
        _valueRequests.Add(
            new ValueRequest 
            { 
                FullName = fullName, 
                Value = value, 
                CompletedEvent = resetEvent 
            });
    }

    resetEvent.WaitOne();

    return Task.CompletedTask;
}

private async void WriteValuesToServer()
{
    IList<ValueRequest> values;

    lock (_valuesLock)
    {
        values = _valueRequests.ToArray();
        _valueRequests.Clear();
    }

    if (values.Any())
    {
        await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request

        foreach (var value as values)
            value.CompletedEvent?.Set();
    }
}
...

有什么建议吗?

您可以在 ValueEntry class 中使用 TaskCompletionSource 将信号从编写者传递给调用者。

private class ValueEntry
{
    public string FullName { get; set; }
    public object Value { get; set; }

    protected readonly TaskCompletionSource _tcs = new TaskCompleteionSource();

    public Task AwaitCompletion()
    {
        return _tcs.Task;
    }

    public Task MarkComplete()
    {
        return _tcs.SetResult();
    }
}

对 WriteValuesToServer 的小改动:

public async Task WriteValuesToServer()
{
    // snip 
    if (values.Any())
    {
        await _emsClient.SetValuesAsync(values); // Sends HTTP POST request

        foreach (var value as values)
            await value.MarkComplete();
    }
}

现在你的作者很简单:

public Task WriteAsync(string fullName, object value)
{
    var request = new ValueRequest { FullName = fullName, Value = value };

    lock (_valuesLock)
    {
        _valueRequests.Add(request)
    };
    await request.AwaitCompletion();
}

此外,我建议您考虑使用 BlockingCollection,它旨在处理 producer/consumer 队列,并且可以让您摆脱大部分 lock 块.