从方法调用队列返回数据
Returning data from a queue of method calls
我正在尝试创建一个队列来限制和重试 API 对服务的调用。
一张粗制滥造的图:
队列需要接受多种类型的调用(我想我已经记下了),将方法调用及其参数的队列存储到API 库中。如果呼叫失败,则需要在队列顶部重试(已解决)。然后它需要 return 数据返回到进行原始调用的程序中的方法(或回调到另一个方法),即使该调用由于队列而延迟了很长一段时间。一直以来,不阻塞 UI.
除了我 return 将数据返回给原始调用者的那部分,我已经弄明白了大部分内容。或者至少是 class 中的一个方法,它将接受数据(如回调)。
我该怎么做?当可以从任意数量的 classes 进行调用时,我如何 return 将队列中的数据返回到程序的其余部分?如果我要利用回调,where/how 我是否将回调信息存储在队列中?
关于队列将做什么的另一个劣质图表:
下面的代码会给你一个想法。
// this is common practice for genertic classes like BaseApiRequest<T> -
// create parent class which does not have generic parameters
public abstract class BaseApiRequest : IDisposable {
public abstract void Dispose();
public abstract void SetException(Exception ex);
}
public abstract class BaseApiRequest<T> : BaseApiRequest {
private readonly ManualResetEventSlim _signal;
private Exception _exception;
private T _result;
protected BaseApiRequest() {
_signal = new ManualResetEventSlim(false);
}
public T GetResult() {
_signal.Wait();
if (_exception != null)
throw new Exception("Exception during request processing. See inner exception for details", _exception);
return _result;
}
public T GetResult(CancellationToken token) {
_signal.Wait(token);
if (_exception != null)
throw new Exception("Exception during request processing. See inner exception for details", _exception);
return _result;
}
public bool TryGetResult(TimeSpan timeout, out T result) {
result = default(T);
if (_signal.Wait(timeout)) {
if (_exception != null)
throw new Exception("Exception during request processing. See inner exception for details", _exception);
result = _result;
return true;
}
return false;
}
public void SetResult(T result) {
_result = result;
_signal.Set();
var handler = ResultReady;
if (handler != null)
handler(this, new ResultReadyEventArgs<T>(_result));
}
public override void SetException(Exception ex) {
_exception = ex;
_signal.Set();
var handler = ResultReady;
if (handler != null)
handler(this, new ResultReadyEventArgs<T>(_exception));
}
public override void Dispose() {
_signal.Dispose();
}
public event EventHandler<ResultReadyEventArgs<T>> ResultReady;
public class ResultReadyEventArgs<T> : EventArgs {
public ResultReadyEventArgs(T result) {
this.Result = result;
this.Success = true;
}
public ResultReadyEventArgs(Exception ex)
{
this.Exception = ex;
this.Success = false;
}
public bool Success { get; private set; }
public T Result { get; private set; }
public Exception Exception { get; private set; }
}
}
这是您 api 请求的可能基础 class。处理请求时,您的处理器会调用 SetResult。调用方创建请求,将其发布到您的队列,然后有选项:
调用方需要同步结果。然后他调用 GetResult,这是一个阻塞调用。如果他不想永远等待以防出现问题,调用者可以使用带超时的 TryGetResult。
如果不立即需要结果,调用者可以订阅 ResultReady 事件,或者稍后再调用 GetResult。
现在要处理您的请求,您有多种选择。一种是向 BaseApiRequest 添加句柄逻辑。然后,您从处理器中的队列中提取 BaseApiRequest,然后对其调用 Process。您可能会说请求不应包含处理自身的逻辑,因为它只是请求。然后考虑更复杂一点的class结构:
public interface IApiRequestHandler {
// type of request which is handled by this handler
Type RequestType { get; }
void Validate(BaseApiRequest request);
void Process(BaseApiRequest request);
}
// specific request
public class SomeDataRequest : BaseApiRequest<int> {
public string Argument1 { get; set; }
public long Argument2 { get; set; }
}
// specific request handler
public class SomeDataRequestHandler : IApiRequestHandler {
public Type RequestType { get { return typeof(SomeDataRequest); } }
public void Validate(BaseApiRequest baseRequest) {
// safe to cast here
var request = (SomeDataRequest) baseRequest;
// validate and throw exception if something is wrong
// no reason to validate when we already started processing
}
public void Process(BaseApiRequest baseRequest) {
// safe to cast here
var request = (SomeDataRequest) baseRequest;
// do processing
request.SetResult(1);
}
}
然后,您的 api 将如下所示:
// this should be singleton
public class Api : IDisposable {
private readonly BlockingCollection<BaseApiRequest> _requests = new BlockingCollection<BaseApiRequest>(new ConcurrentQueue<BaseApiRequest>());
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly Dictionary<Type, IApiRequestHandler> _handlers = new Dictionary<Type, IApiRequestHandler>();
public Api() {
// find or explicitly register handlers in some way
// here we just search them in current assembly
foreach (var type in Assembly.GetExecutingAssembly().GetTypes().Where(c => typeof (IApiRequestHandler).IsAssignableFrom(c))) {
var handler = (IApiRequestHandler) Activator.CreateInstance(type);
if (_handlers.ContainsKey(handler.RequestType))
throw new Exception(String.Format("Request handler for request type {0} already registered.", handler.RequestType));
_handlers.Add(handler.RequestType, handler);
}
new Thread(ProcessingLoop) {IsBackground = true}.Start();
}
private void ProcessingLoop() {
try {
foreach (var request in _requests.GetConsumingEnumerable(_cts.Token)) {
try {
// no casting from object or switches here
_handlers[request.GetType()].Process(request);
}
catch (Exception ex) {
request. SetException(ex);
}
}
}
catch (OperationCanceledException) {
return;
}
}
public void StartProcessing(BaseApiRequest request) {
if (_handlers.ContainsKey(request.GetType()))
throw new Exception("No handlers registered for request type " + request.GetType());
// validate synchronously
_handlers[request.GetType()].Validate(request);
_requests.Add(request);
}
public void Dispose() {
_cts.Cancel();
}
}
我正在尝试创建一个队列来限制和重试 API 对服务的调用。
一张粗制滥造的图:
队列需要接受多种类型的调用(我想我已经记下了),将方法调用及其参数的队列存储到API 库中。如果呼叫失败,则需要在队列顶部重试(已解决)。然后它需要 return 数据返回到进行原始调用的程序中的方法(或回调到另一个方法),即使该调用由于队列而延迟了很长一段时间。一直以来,不阻塞 UI.
除了我 return 将数据返回给原始调用者的那部分,我已经弄明白了大部分内容。或者至少是 class 中的一个方法,它将接受数据(如回调)。
我该怎么做?当可以从任意数量的 classes 进行调用时,我如何 return 将队列中的数据返回到程序的其余部分?如果我要利用回调,where/how 我是否将回调信息存储在队列中?
关于队列将做什么的另一个劣质图表:
下面的代码会给你一个想法。
// this is common practice for genertic classes like BaseApiRequest<T> -
// create parent class which does not have generic parameters
public abstract class BaseApiRequest : IDisposable {
public abstract void Dispose();
public abstract void SetException(Exception ex);
}
public abstract class BaseApiRequest<T> : BaseApiRequest {
private readonly ManualResetEventSlim _signal;
private Exception _exception;
private T _result;
protected BaseApiRequest() {
_signal = new ManualResetEventSlim(false);
}
public T GetResult() {
_signal.Wait();
if (_exception != null)
throw new Exception("Exception during request processing. See inner exception for details", _exception);
return _result;
}
public T GetResult(CancellationToken token) {
_signal.Wait(token);
if (_exception != null)
throw new Exception("Exception during request processing. See inner exception for details", _exception);
return _result;
}
public bool TryGetResult(TimeSpan timeout, out T result) {
result = default(T);
if (_signal.Wait(timeout)) {
if (_exception != null)
throw new Exception("Exception during request processing. See inner exception for details", _exception);
result = _result;
return true;
}
return false;
}
public void SetResult(T result) {
_result = result;
_signal.Set();
var handler = ResultReady;
if (handler != null)
handler(this, new ResultReadyEventArgs<T>(_result));
}
public override void SetException(Exception ex) {
_exception = ex;
_signal.Set();
var handler = ResultReady;
if (handler != null)
handler(this, new ResultReadyEventArgs<T>(_exception));
}
public override void Dispose() {
_signal.Dispose();
}
public event EventHandler<ResultReadyEventArgs<T>> ResultReady;
public class ResultReadyEventArgs<T> : EventArgs {
public ResultReadyEventArgs(T result) {
this.Result = result;
this.Success = true;
}
public ResultReadyEventArgs(Exception ex)
{
this.Exception = ex;
this.Success = false;
}
public bool Success { get; private set; }
public T Result { get; private set; }
public Exception Exception { get; private set; }
}
}
这是您 api 请求的可能基础 class。处理请求时,您的处理器会调用 SetResult。调用方创建请求,将其发布到您的队列,然后有选项:
调用方需要同步结果。然后他调用 GetResult,这是一个阻塞调用。如果他不想永远等待以防出现问题,调用者可以使用带超时的 TryGetResult。
如果不立即需要结果,调用者可以订阅 ResultReady 事件,或者稍后再调用 GetResult。
现在要处理您的请求,您有多种选择。一种是向 BaseApiRequest 添加句柄逻辑。然后,您从处理器中的队列中提取 BaseApiRequest,然后对其调用 Process。您可能会说请求不应包含处理自身的逻辑,因为它只是请求。然后考虑更复杂一点的class结构:
public interface IApiRequestHandler {
// type of request which is handled by this handler
Type RequestType { get; }
void Validate(BaseApiRequest request);
void Process(BaseApiRequest request);
}
// specific request
public class SomeDataRequest : BaseApiRequest<int> {
public string Argument1 { get; set; }
public long Argument2 { get; set; }
}
// specific request handler
public class SomeDataRequestHandler : IApiRequestHandler {
public Type RequestType { get { return typeof(SomeDataRequest); } }
public void Validate(BaseApiRequest baseRequest) {
// safe to cast here
var request = (SomeDataRequest) baseRequest;
// validate and throw exception if something is wrong
// no reason to validate when we already started processing
}
public void Process(BaseApiRequest baseRequest) {
// safe to cast here
var request = (SomeDataRequest) baseRequest;
// do processing
request.SetResult(1);
}
}
然后,您的 api 将如下所示:
// this should be singleton
public class Api : IDisposable {
private readonly BlockingCollection<BaseApiRequest> _requests = new BlockingCollection<BaseApiRequest>(new ConcurrentQueue<BaseApiRequest>());
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly Dictionary<Type, IApiRequestHandler> _handlers = new Dictionary<Type, IApiRequestHandler>();
public Api() {
// find or explicitly register handlers in some way
// here we just search them in current assembly
foreach (var type in Assembly.GetExecutingAssembly().GetTypes().Where(c => typeof (IApiRequestHandler).IsAssignableFrom(c))) {
var handler = (IApiRequestHandler) Activator.CreateInstance(type);
if (_handlers.ContainsKey(handler.RequestType))
throw new Exception(String.Format("Request handler for request type {0} already registered.", handler.RequestType));
_handlers.Add(handler.RequestType, handler);
}
new Thread(ProcessingLoop) {IsBackground = true}.Start();
}
private void ProcessingLoop() {
try {
foreach (var request in _requests.GetConsumingEnumerable(_cts.Token)) {
try {
// no casting from object or switches here
_handlers[request.GetType()].Process(request);
}
catch (Exception ex) {
request. SetException(ex);
}
}
}
catch (OperationCanceledException) {
return;
}
}
public void StartProcessing(BaseApiRequest request) {
if (_handlers.ContainsKey(request.GetType()))
throw new Exception("No handlers registered for request type " + request.GetType());
// validate synchronously
_handlers[request.GetType()].Validate(request);
_requests.Add(request);
}
public void Dispose() {
_cts.Cancel();
}
}