为基于排队计时器的 HTTP 调用创建异步方法
Create async method for queued timer based HTTP call
我有一个 class,它包含一个请求队列,这些请求将被收集并在最大 1 秒的时间间隔后通过 HTTP 调用发送到 Web API:
public class AsyncValueTimerIntervalWriter
{
private class ValueRequest
{
public string FullName { get; set; }
public object Value { get; set; }
}
private readonly IValuesClient _valuesClient; // auto generated Swagger HTTP client
private List<ValueRequest> _valueRequests = new List<ValueRequest>();
private object _valuesLock = new object();
private Timer _timer;
public AsyncValueTimerIntervalWriter(IValuesClient valuesClient)
{
_valuesClient = valuesClient;
}
public void Start()
{
_timer = new Timer(o => WriteValuesToServer(), null, 0, 1000);
}
public void Stop()
{
_timer?.Dispose();
_timer = null;
}
public void AddWrite(string fullName, object value)
{
lock (_valuesLock)
{
_valueRequests.Add(new ValueRequest { FullName = fullName, Value = value });
}
}
private async void WriteValuesToServer()
{
IList<ValueRequest> values;
lock (_valuesLock)
{
values = _valueRequests.ToArray();
_valueRequests.Clear();
}
if (values.Any())
{
await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
}
}
}
来电示例:
var asyncWriter = new AsyncValueTimerIntervalWriter(...);
asyncWriter.AddWrite("My.Var.Tree.VarName", 1234);
asyncWriter.AddWrite("My.Var.Tree.AnotherVar", "Test");
// after max 1 sec the values are written to server
我的目标是写一个async方法,里面也加了一个值来写,returns写值的时候:
await asyncWriter.WriteAsync("My.Var.Tree.VarName", 1234);
// should continue after written to server
重要提示:我需要在队列中处理请求,因为writer随时可能停止,不允许丢失请求。再次启动写入器后,需要将添加的请求发送到服务器。
我尝试使用ManualResetEvent,但感觉很奇怪:
...
public Task WriteAsync(string fullName, object value)
{
var resetEvent = new ManualResetEvent(false);
lock (_valuesLock)
{
_valueRequests.Add(
new ValueRequest
{
FullName = fullName,
Value = value,
CompletedEvent = resetEvent
});
}
resetEvent.WaitOne();
return Task.CompletedTask;
}
private async void WriteValuesToServer()
{
IList<ValueRequest> values;
lock (_valuesLock)
{
values = _valueRequests.ToArray();
_valueRequests.Clear();
}
if (values.Any())
{
await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
foreach (var value as values)
value.CompletedEvent?.Set();
}
}
...
有什么建议吗?
您可以在 ValueEntry class 中使用 TaskCompletionSource
将信号从编写者传递给调用者。
private class ValueEntry
{
public string FullName { get; set; }
public object Value { get; set; }
protected readonly TaskCompletionSource _tcs = new TaskCompleteionSource();
public Task AwaitCompletion()
{
return _tcs.Task;
}
public Task MarkComplete()
{
return _tcs.SetResult();
}
}
对 WriteValuesToServer 的小改动:
public async Task WriteValuesToServer()
{
// snip
if (values.Any())
{
await _emsClient.SetValuesAsync(values); // Sends HTTP POST request
foreach (var value as values)
await value.MarkComplete();
}
}
现在你的作者很简单:
public Task WriteAsync(string fullName, object value)
{
var request = new ValueRequest { FullName = fullName, Value = value };
lock (_valuesLock)
{
_valueRequests.Add(request)
};
await request.AwaitCompletion();
}
此外,我建议您考虑使用 BlockingCollection,它旨在处理 producer/consumer 队列,并且可以让您摆脱大部分 lock
块.
我有一个 class,它包含一个请求队列,这些请求将被收集并在最大 1 秒的时间间隔后通过 HTTP 调用发送到 Web API:
public class AsyncValueTimerIntervalWriter
{
private class ValueRequest
{
public string FullName { get; set; }
public object Value { get; set; }
}
private readonly IValuesClient _valuesClient; // auto generated Swagger HTTP client
private List<ValueRequest> _valueRequests = new List<ValueRequest>();
private object _valuesLock = new object();
private Timer _timer;
public AsyncValueTimerIntervalWriter(IValuesClient valuesClient)
{
_valuesClient = valuesClient;
}
public void Start()
{
_timer = new Timer(o => WriteValuesToServer(), null, 0, 1000);
}
public void Stop()
{
_timer?.Dispose();
_timer = null;
}
public void AddWrite(string fullName, object value)
{
lock (_valuesLock)
{
_valueRequests.Add(new ValueRequest { FullName = fullName, Value = value });
}
}
private async void WriteValuesToServer()
{
IList<ValueRequest> values;
lock (_valuesLock)
{
values = _valueRequests.ToArray();
_valueRequests.Clear();
}
if (values.Any())
{
await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
}
}
}
来电示例:
var asyncWriter = new AsyncValueTimerIntervalWriter(...);
asyncWriter.AddWrite("My.Var.Tree.VarName", 1234);
asyncWriter.AddWrite("My.Var.Tree.AnotherVar", "Test");
// after max 1 sec the values are written to server
我的目标是写一个async方法,里面也加了一个值来写,returns写值的时候:
await asyncWriter.WriteAsync("My.Var.Tree.VarName", 1234);
// should continue after written to server
重要提示:我需要在队列中处理请求,因为writer随时可能停止,不允许丢失请求。再次启动写入器后,需要将添加的请求发送到服务器。
我尝试使用ManualResetEvent,但感觉很奇怪:
...
public Task WriteAsync(string fullName, object value)
{
var resetEvent = new ManualResetEvent(false);
lock (_valuesLock)
{
_valueRequests.Add(
new ValueRequest
{
FullName = fullName,
Value = value,
CompletedEvent = resetEvent
});
}
resetEvent.WaitOne();
return Task.CompletedTask;
}
private async void WriteValuesToServer()
{
IList<ValueRequest> values;
lock (_valuesLock)
{
values = _valueRequests.ToArray();
_valueRequests.Clear();
}
if (values.Any())
{
await _valuesClient.SetValuesAsync(values); // Sends HTTP POST request
foreach (var value as values)
value.CompletedEvent?.Set();
}
}
...
有什么建议吗?
您可以在 ValueEntry class 中使用 TaskCompletionSource
将信号从编写者传递给调用者。
private class ValueEntry
{
public string FullName { get; set; }
public object Value { get; set; }
protected readonly TaskCompletionSource _tcs = new TaskCompleteionSource();
public Task AwaitCompletion()
{
return _tcs.Task;
}
public Task MarkComplete()
{
return _tcs.SetResult();
}
}
对 WriteValuesToServer 的小改动:
public async Task WriteValuesToServer()
{
// snip
if (values.Any())
{
await _emsClient.SetValuesAsync(values); // Sends HTTP POST request
foreach (var value as values)
await value.MarkComplete();
}
}
现在你的作者很简单:
public Task WriteAsync(string fullName, object value)
{
var request = new ValueRequest { FullName = fullName, Value = value };
lock (_valuesLock)
{
_valueRequests.Add(request)
};
await request.AwaitCompletion();
}
此外,我建议您考虑使用 BlockingCollection,它旨在处理 producer/consumer 队列,并且可以让您摆脱大部分 lock
块.