如何为 Azure 存储队列创建依赖注入以处理提供的每条消息
How to create dependency injection for Azure Storage queues to process each message as it is provided
正在尝试从某个其他服务写入的 Azure 队列中读取。如果我在 startup.cs
中使用它
CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=*;AccountKey=*;EndpointSuffix=*");
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("*");
queue.CreateIfNotExists();
var message= queue.GetMessage();
我可以在 'message' 变量中获取消息,但是如何在启动时注入它,以便每次队列中有新消息时我的处理器 class 都会调用该消息。我试图通过
添加一个单例
services.AddSinleton<ProcessorClassInterface>(x=> {return new ProcessorClass(queue)});
然后每隔 1 秒调用 queue.GetMessage
。
这是通过调用一个函数解决的,该函数使用多线程在指定的时间间隔后轮询 azure 队列并获取消息(可能设置指数退避时间)。
方法一:
在 webapp 中实现这个有点棘手,我不得不使用 hack - 从构造函数调用函数来开始轮询。
在startup.cs(配置函数内部)中,注册您的服务,
app.ApplicationServices.GetService<IQueueConsumer>();
在 ConfigureServices 函数中,配置和创建轮询队列的对象 class,
services.TryAddTransient<IQueueConsumer>(sp => this.GetQueueProcessor(sp));
然后,当调用构造函数创建对象时,开始在不同的线程中轮询队列。
public QueuePollingFunction(
IOptions<QueueOptions> queueOptions,
CloudQueue queue)
{
this.isEnabled = queueOptions.Value.IsEnabled;
this.StartPollingQueue(queue);
}
public override async Task<bool> ProcessMessageAsync(string message)
{
bool result = false;
try
{
var messageContent = JsonConvert.DeserializeObject<QueueEntity>(message);
result = true;
}
catch (Exception e)
{
Trace.TraceError(e.ToString());
}
return result;
}
private async Task StartPollingQueue(CloudQueue queue)
{
if (this.isEnabled)
{
Task pollQueue = Task.Factory.StartNew(() => Parallel.For(0, this.numberOfParallelTasks, work =>
{
this.Start(queue);
}));
}
}
private async Task Start(CloudQueue queue)
{
while (true)
{
try
{
CloudQueueMessage retrievedMessage = await queue.GetMessageAsync();
if (retrievedMessage != null)
{
// Fail Logic
if (retrievedMessage.DequeueCount > this.maxRetryLimit)
{
await queue.DeleteMessageAsync(retrievedMessage);
}
bool isPass = await this.ProcessMessageAsync(newChannelSettings);
if (isPass)
{
await queue.DeleteMessageAsync(retrievedMessage);
}
}
else
{
// If queue is empty, then the Task can sleep for sleepTime duration
await Task.Delay(this.sleepTime);
}
}
catch (Exception e)
{
Trace.TraceError(e.ToString());
}
}
}
方法二:
然而,后来不得不转向最优方法,即使用 worker-roles 然后使用 Tasks 到 运行 一个后台线程来执行此任务。
正在尝试从某个其他服务写入的 Azure 队列中读取。如果我在 startup.cs
中使用它CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=*;AccountKey=*;EndpointSuffix=*");
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("*");
queue.CreateIfNotExists();
var message= queue.GetMessage();
我可以在 'message' 变量中获取消息,但是如何在启动时注入它,以便每次队列中有新消息时我的处理器 class 都会调用该消息。我试图通过
添加一个单例services.AddSinleton<ProcessorClassInterface>(x=> {return new ProcessorClass(queue)});
然后每隔 1 秒调用 queue.GetMessage
。
这是通过调用一个函数解决的,该函数使用多线程在指定的时间间隔后轮询 azure 队列并获取消息(可能设置指数退避时间)。
方法一: 在 webapp 中实现这个有点棘手,我不得不使用 hack - 从构造函数调用函数来开始轮询。
在startup.cs(配置函数内部)中,注册您的服务,
app.ApplicationServices.GetService<IQueueConsumer>();
在 ConfigureServices 函数中,配置和创建轮询队列的对象 class,
services.TryAddTransient<IQueueConsumer>(sp => this.GetQueueProcessor(sp));
然后,当调用构造函数创建对象时,开始在不同的线程中轮询队列。
public QueuePollingFunction(
IOptions<QueueOptions> queueOptions,
CloudQueue queue)
{
this.isEnabled = queueOptions.Value.IsEnabled;
this.StartPollingQueue(queue);
}
public override async Task<bool> ProcessMessageAsync(string message)
{
bool result = false;
try
{
var messageContent = JsonConvert.DeserializeObject<QueueEntity>(message);
result = true;
}
catch (Exception e)
{
Trace.TraceError(e.ToString());
}
return result;
}
private async Task StartPollingQueue(CloudQueue queue)
{
if (this.isEnabled)
{
Task pollQueue = Task.Factory.StartNew(() => Parallel.For(0, this.numberOfParallelTasks, work =>
{
this.Start(queue);
}));
}
}
private async Task Start(CloudQueue queue)
{
while (true)
{
try
{
CloudQueueMessage retrievedMessage = await queue.GetMessageAsync();
if (retrievedMessage != null)
{
// Fail Logic
if (retrievedMessage.DequeueCount > this.maxRetryLimit)
{
await queue.DeleteMessageAsync(retrievedMessage);
}
bool isPass = await this.ProcessMessageAsync(newChannelSettings);
if (isPass)
{
await queue.DeleteMessageAsync(retrievedMessage);
}
}
else
{
// If queue is empty, then the Task can sleep for sleepTime duration
await Task.Delay(this.sleepTime);
}
}
catch (Exception e)
{
Trace.TraceError(e.ToString());
}
}
}
方法二: 然而,后来不得不转向最优方法,即使用 worker-roles 然后使用 Tasks 到 运行 一个后台线程来执行此任务。