如何查看 Azure 服务总线队列中的所有消息?

How to peek all messages in Azure Service Bus queue?

我想查看来自多个 Azure 服务总线队列的所有消息。之后,我想在 queueName、insertDate 之后过滤它们,并提供对正文进行全文搜索的机会。

目前,我正在使用 Microsoft.Azure.ServiceBus 包创建一个 ManagementClient 来收集队列信息,然后使用 MessageReceiver 来查看消息。

var managementClient = new ManagementClient(connectionString);

var queue = await managementClient.GetQueueRuntimeInfoAsync(queueName);

var count = queue.MessageCount;

var receiver = new MessageReceiver(connectionString, queueName);

var messagesOfQueue = new List<Message>();

for (var i = 1; i <= count; i++)
{
   messagesOfQueue.Add(await receiver.PeekAsync());
}

有没有更好的方法来获取所有消息?或者有没有办法只查看适用于过滤器的消息?

我还尝试使用 WindowsAzure.ServiceBus 包中的 QueueClient.PeekBatch 方法。但是尽管我设置了正确的 messageCount 参数,但该方法并没有 return 所有消息。

然后还有包Azure.Messaging.ServiceBus...这些包是怎么回事?

那么我应该使用哪个包以及基于某些过滤器查看队列消息的最佳方式是什么?

目前您收到来自收件人的一条消息。更好的选择是使用 PeekBatchAsync(Int64, Int32) method of MessageReceiver.

批量接收消息

下面是示例代码(虽然未经测试):

var messagesOfQueue = new List<Message>();
var sequenceNumber = 0;
var batchSize = 100;//number of messages to receive in a single call
do
{
    var messages = await receiver.PeekBatchAsync(sequenceNumber, batchSize);
    messagesOfQueue.AddRange(messages);
    if (messages.Count > 0)
    {
        sequenceNumber = messages[messages.Count-1].SequenceNumber;
    }
    else
    {
        break;
    }
} while (true);

我目前正在使用的并且按预期工作的解决方案如下所示:

var receiver = serviceBusClient.CreateReceiver(queueName);

var messagesOfQueue = new List<ServiceBusReceivedMessage>();
var previousSequenceNumber = -1L;
var sequenceNumber = 0L;

do
{
  var messageBatch = await receiver.PeekMessagesAsync(int.MaxValue, sequenceNumber);

  if (messageBatch.Count > 0)
  {
    sequenceNumber = messageBatch[^1].SequenceNumber;

    if (sequenceNumber == previousSequenceNumber)
      break;

    messagesOfQueue.AddRange(messageBatch);

    previousSequenceNumber = sequenceNumber;
  }
  else
  {
    break;
  }
} while (true);

它使用了 nuget 包 Azure.Messaging.ServiceBus

该解决方案避免了两次获取具有相同 SequenceNumber 的消息。

序号单调递增。我已经测试了大多数情况,除了在达到最大值时将 sequenceNumber 滚动到 0 (Long.MaxValue)。

using Azure.Messaging.ServiceBus;

private static async Task<List<ServiceBusReceivedMessage>> PeekAllMessages(string serviceBusConnectionString, string queueName)
{
    var client = new ServiceBusClient(serviceBusConnectionString);
    var receiver = client.CreateReceiver(queueName);

    var messages = new List<ServiceBusReceivedMessage>();

    var batchSize = 20;
    var sequenceNumber = 0L;
    do
    {
        var messageBatch = await receiver.PeekMessagesAsync(batchSize, sequenceNumber);

        if (messageBatch.Count <= 0)
        {
            break;
        }

        // Increasing the SequenceNumber by 1 to avoid getting the message with the same SequenceNumber twice
        sequenceNumber = messageBatch[^1].SequenceNumber + 1;

        messages.AddRange(messageBatch);

    } while (true);

    return messages;
}