在异步方法中使用 QueueClient.OnMessage 有意义吗?
Does using QueueClient.OnMessage inside an asynchronous method make sense?
我正在从异步方法 ConfigureConnectionString 调用异步方法 InsertOperation。我是否正确使用了 client.OnMessage 调用?我想异步处理队列中的消息,然后将它们存储到队列存储中。
private static async void ConfigureConnectionString()
{
var connectionString =
"myconnstring";
var queueName = "myqueue";
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString"));
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
CloudTable table = tableClient.GetTableReference("test");
table.CreateIfNotExists();
Stopwatch sw = Stopwatch.StartNew();
await Task.Run(() => InsertOperation(connectionString, queueName, table));
sw.Stop();
Console.WriteLine("ElapsedTime " + sw.Elapsed.TotalMinutes + " minutes.");
}
private static async Task InsertOperation(string connectionString, string queueName, CloudTable table)
{
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
client.OnMessage(message =>
{
var bodyJson = new StreamReader(message.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
var myMessage = JsonConvert.DeserializeObject<VerifyVariable>(bodyJson);
Console.WriteLine();
var VerifyVariableEntityObject = new VerifyVariableEntity()
{
ConsumerId = myMessage.ConsumerId,
Score = myMessage.Score,
PartitionKey = myMessage.ConsumerId,
RowKey = myMessage.Score
};
});
}
OnMessageAsync method 提供异步编程模型,它使我们能够异步处理消息。
client.OnMessageAsync(message =>
{
return Task.Factory.StartNew(() => ProcessMessage(message));
//you could perofrm table and queue storage in ProcessMessage method
}, options);
在不了解您想要实现的实际逻辑的情况下,您似乎没有正确使用 OnMessage。
OnMessage 是一种为长 运行 客户端设置队列客户端行为的方法。这是有道理的,例如,如果您的应用程序中有一个单例实例。在这种情况下,您要向客户端指定您希望如何处理放入队列中的任何消息。
但是,在您的示例中,您创建了客户端,设置了 OnMessage,并且没有保留客户端,因此它实际上没有完成任何事情。
我正在从异步方法 ConfigureConnectionString 调用异步方法 InsertOperation。我是否正确使用了 client.OnMessage 调用?我想异步处理队列中的消息,然后将它们存储到队列存储中。
private static async void ConfigureConnectionString()
{
var connectionString =
"myconnstring";
var queueName = "myqueue";
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString"));
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
CloudTable table = tableClient.GetTableReference("test");
table.CreateIfNotExists();
Stopwatch sw = Stopwatch.StartNew();
await Task.Run(() => InsertOperation(connectionString, queueName, table));
sw.Stop();
Console.WriteLine("ElapsedTime " + sw.Elapsed.TotalMinutes + " minutes.");
}
private static async Task InsertOperation(string connectionString, string queueName, CloudTable table)
{
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
client.OnMessage(message =>
{
var bodyJson = new StreamReader(message.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
var myMessage = JsonConvert.DeserializeObject<VerifyVariable>(bodyJson);
Console.WriteLine();
var VerifyVariableEntityObject = new VerifyVariableEntity()
{
ConsumerId = myMessage.ConsumerId,
Score = myMessage.Score,
PartitionKey = myMessage.ConsumerId,
RowKey = myMessage.Score
};
});
}
OnMessageAsync method 提供异步编程模型,它使我们能够异步处理消息。
client.OnMessageAsync(message =>
{
return Task.Factory.StartNew(() => ProcessMessage(message));
//you could perofrm table and queue storage in ProcessMessage method
}, options);
在不了解您想要实现的实际逻辑的情况下,您似乎没有正确使用 OnMessage。
OnMessage 是一种为长 运行 客户端设置队列客户端行为的方法。这是有道理的,例如,如果您的应用程序中有一个单例实例。在这种情况下,您要向客户端指定您希望如何处理放入队列中的任何消息。
但是,在您的示例中,您创建了客户端,设置了 OnMessage,并且没有保留客户端,因此它实际上没有完成任何事情。