如何从 Azure 服务总线读取消息
How to read message off azure service bus
我正在尝试使用 C# azure 函数从队列中读取消息(通知而不是轮询)。
我目前正在使用控制台应用程序将消息推送到队列中,请参见下文。
似乎有两种不同的产品,Azure 服务总线和存储帐户中的队列。下面的代码用的是后者,不知如何读取队列的消息?
非常感谢,
代码示例
StorageCredentials creds = new StorageCredentials(accountname, accountkey);
CloudStorageAccount account = new CloudStorageAccount(creds, useHttps: true);
CloudQueueClient queueClient = account.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("test-queue");
queue.CreateIfNotExists();
while (true)
{
CloudQueueMessage message = new CloudQueueMessage(JsonConvert.SerializeObject("my message"));
queue.AddMessage(message);
}
更新
在尝试提供帮助的专家建议的评论和链接之后,我尝试了以下代码示例将消息推送到 Azure 服务总线(即不是存储帐户中的队列)
namespace ConsoleApp1
{
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
class Program
{
const string ServiceBusConnectionString = "Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=xxx;SharedAccessKey=xxx";
const string QueueName = "name_of_queue";
static IQueueClient queueClient;
public static async Task Main(string[] args)
{
const int numberOfMessages = 10;
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
await SendMessagesAsync(numberOfMessages);
await queueClient.CloseAsync();
}
static async Task SendMessagesAsync(int numberOfMessagesToSend)
{
for (var i = 0; i < numberOfMessagesToSend; i++)
{
string messageBody = $"Message {i}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
Console.WriteLine($"Sending message: {messageBody}");
// Send the message to the queue
try
{
await queueClient.SendAsync(message); // this line
}
catch (Exception ex)
{
throw ex;
}
}
}
}
}
当我 运行 控制台上方的代码 window 不执行任何操作时,没有错误消息,什么也没有..!奇怪.. 查看 azure 服务总线概述,它的活动消息计数为零。
我正在使用这个示例项目,但 queueClient.SendAsync 从未 returns 返回。我是否需要在天蓝色中设置一些东西,也许是权限?
https://github.com/Azure/azure-service-bus
最终收到错误
A connection attempt failed because the connected party did not
properly respond after a period of time, or established connection
failed because connected host has failed to respond
我可以在门户服务总线屏幕中看到请求
这些是你想要的吗?
1.If 您想要从 Azure 服务总线队列获取消息:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
namespace ConsoleApp2
{
class Program
{
const string ServiceBusConnectionString = "xxxxxx";
const string QueueName = "xxx";
static IQueueClient queueClient;
public static async Task Main(string[] args)
{
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
RegisterOnMessageHandlerAndReceiveMessages();
Console.ReadKey();
await queueClient.CloseAsync();
}
static void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 10,
AutoComplete = false
};
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
}
在从服务总线队列接收消息之前,我已将消息放入队列中。
这是结果:
2.If 您想要从 Azure 存储队列获取消息:
using System;
using Microsoft.Azure;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
namespace ReceiveMessageFromAzureStorageQueue
{
class Program
{
static void Main(string[] args)
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse("xxxxxx");
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("xxx");
CloudQueueMessage peekedMessage = queue.PeekMessage();
queue.FetchAttributes();
int? cachedMessageCount = queue.ApproximateMessageCount;
Console.WriteLine("Number of messages in queue: " + cachedMessageCount);
for(int i=0; i<cachedMessageCount; i++) {
System.Threading.Thread.Sleep(10);
CloudQueueMessage retrievedMessage = queue.GetMessage(TimeSpan.FromMilliseconds(10));
Console.WriteLine(retrievedMessage.AsString);
queue.DeleteMessage(retrievedMessage);
}
Console.WriteLine("Already Read.");
}
}
}
在接收之前仍然将消息放入 azure 存储队列中。
这是结果:
如有更多疑问,请告诉我。
我正在尝试使用 C# azure 函数从队列中读取消息(通知而不是轮询)。
我目前正在使用控制台应用程序将消息推送到队列中,请参见下文。
似乎有两种不同的产品,Azure 服务总线和存储帐户中的队列。下面的代码用的是后者,不知如何读取队列的消息?
非常感谢,
代码示例
StorageCredentials creds = new StorageCredentials(accountname, accountkey);
CloudStorageAccount account = new CloudStorageAccount(creds, useHttps: true);
CloudQueueClient queueClient = account.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("test-queue");
queue.CreateIfNotExists();
while (true)
{
CloudQueueMessage message = new CloudQueueMessage(JsonConvert.SerializeObject("my message"));
queue.AddMessage(message);
}
更新
在尝试提供帮助的专家建议的评论和链接之后,我尝试了以下代码示例将消息推送到 Azure 服务总线(即不是存储帐户中的队列)
namespace ConsoleApp1
{
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
class Program
{
const string ServiceBusConnectionString = "Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=xxx;SharedAccessKey=xxx";
const string QueueName = "name_of_queue";
static IQueueClient queueClient;
public static async Task Main(string[] args)
{
const int numberOfMessages = 10;
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
await SendMessagesAsync(numberOfMessages);
await queueClient.CloseAsync();
}
static async Task SendMessagesAsync(int numberOfMessagesToSend)
{
for (var i = 0; i < numberOfMessagesToSend; i++)
{
string messageBody = $"Message {i}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
Console.WriteLine($"Sending message: {messageBody}");
// Send the message to the queue
try
{
await queueClient.SendAsync(message); // this line
}
catch (Exception ex)
{
throw ex;
}
}
}
}
}
当我 运行 控制台上方的代码 window 不执行任何操作时,没有错误消息,什么也没有..!奇怪.. 查看 azure 服务总线概述,它的活动消息计数为零。
我正在使用这个示例项目,但 queueClient.SendAsync 从未 returns 返回。我是否需要在天蓝色中设置一些东西,也许是权限?
https://github.com/Azure/azure-service-bus
最终收到错误
A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
我可以在门户服务总线屏幕中看到请求
这些是你想要的吗?
1.If 您想要从 Azure 服务总线队列获取消息:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
namespace ConsoleApp2
{
class Program
{
const string ServiceBusConnectionString = "xxxxxx";
const string QueueName = "xxx";
static IQueueClient queueClient;
public static async Task Main(string[] args)
{
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
RegisterOnMessageHandlerAndReceiveMessages();
Console.ReadKey();
await queueClient.CloseAsync();
}
static void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 10,
AutoComplete = false
};
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
}
在从服务总线队列接收消息之前,我已将消息放入队列中。 这是结果:
2.If 您想要从 Azure 存储队列获取消息:
using System;
using Microsoft.Azure;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
namespace ReceiveMessageFromAzureStorageQueue
{
class Program
{
static void Main(string[] args)
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse("xxxxxx");
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("xxx");
CloudQueueMessage peekedMessage = queue.PeekMessage();
queue.FetchAttributes();
int? cachedMessageCount = queue.ApproximateMessageCount;
Console.WriteLine("Number of messages in queue: " + cachedMessageCount);
for(int i=0; i<cachedMessageCount; i++) {
System.Threading.Thread.Sleep(10);
CloudQueueMessage retrievedMessage = queue.GetMessage(TimeSpan.FromMilliseconds(10));
Console.WriteLine(retrievedMessage.AsString);
queue.DeleteMessage(retrievedMessage);
}
Console.WriteLine("Already Read.");
}
}
}
在接收之前仍然将消息放入 azure 存储队列中。 这是结果:
如有更多疑问,请告诉我。