"MaxAutoRenewDuration" 在 Azure 服务总线中的作用是什么?

What is the role of "MaxAutoRenewDuration" in azure service bus?

我正在使用 Microsoft.Azure.ServiceBus。 (doc)

我遇到了一个例外:

The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue.

借助这些问题:

, , ,

我可以通过将 AutoComplete 设置为 false 并将 Azure 的队列锁定持续时间增加到最大值(从 30 秒到 5 分钟)来避免 Exception

_queueClient.RegisterMessageHandler(ProcessMessagesAsync, new 
                         MessageHandlerOptions(ExceptionReceivedHandler)
                         {
                             MaxConcurrentCalls = 1,
                             MaxAutoRenewDuration = TimeSpan.FromSeconds(10),
                             AutoComplete = false
                         }
);

private async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
    await ProccesMessage(message);
}

private async Task ProccesMessage(Message message)
{
    //The complete should be closed before long-timed process
    await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
    await DoFoo(message.Body); //some long running process
}

我的问题是:

您需要考虑一些事项。

  1. 锁定时长
  2. 从代理获取消息以来的总时间

锁定持续时间很简单 - 单个竞争消费者可以租用一条消息多长时间 w/o 将该消息租给任何其他竞争消费者。

总时间有点诡异。注册接收消息的回调 ProcessMessagesAsync 并不是唯一涉及的事情。在您提供的代码示例中,您将并发设置为 1。如果配置了预取(队列在每次请求一条消息或多条消息时收到多条消息),服务器上的锁定持续时间时钟开始计时所有这些消息。因此,如果您的处理稍微低于 MaxLockDuration 但对于相同的示例,最后一条预取消息等待处理的时间过长,即使它在小于锁定持续时间的时间内完成,它也可能会失去锁定并且尝试完成该消息时将抛出异常。

这就是 MaxAutoRenewDuration 进入游戏的地方。它所做的是扩展与代理的消息租约,"re-locking" 它用于当前正在处理消息的竞争消费者。 MaxAutoRenewDuration 应设置为 "possibly maximum processing time a lease will be required"。在您的样本中,它被设置为 TimeSpan.FromSeconds(10) ,这是非常低的。需要设置至少比MaxLockDuration长,调整到最长ProccesMessage的时间就需要运行。考虑预取。

为了帮助形象化它,请考虑客户端有一个内存中队列,当您在处理程序中逐条执行消息的串行处理时,可以在其中存储消息。消息从代理到达内存中队列的那一刻起,租约就开始了。如果内存队列中的总时间加上处理时间超过锁定持续时间,则租约丢失。您的选择是:

  1. 通过设置 MaxConcurrentCalls > 1
  2. 启用并发处理
  3. 增加MaxLockDuration
  4. 减少消息预取(如果你使用的话)
  5. 配置MaxAutoRenewDuration更新锁并克服MaxLockDuration约束

关于 #4 的注意事项 - 这不是一个有保证的操作。因此,对代理的调用有可能会失败,并且不会延长消息锁定。我建议将您的解决方案设计为在锁定持续时间限制内工作。或者,保留消息信息,这样您的处理就不必受消息传递的限制。