当消息在 ServiceBusTrigger Azure Function 中处理失败时,我如何延迟处理相同消息 x 分钟?

When a message fails to process in a ServiceBusTrigger Azure Function, how can I delay processing the same message for x minutes?

我有一个从 ServiceBus 主题读取并调用第 3 方服务的 azure 函数。如果服务中断,我想等待 5 分钟,然后再尝试用相同的消息再次调用它。如何添加延迟,以便 azure 函数不会放弃消息并立即再次将其取回?

public static void Run([ServiceBusTrigger("someTopic", 
     "someSubscription", AccessRights.Manage, Connection = 
     "ServiceBusConnection")] BrokeredMessage message) 
{
     CallService(bodyOfBrokeredMessage); //service is down

     //How do I add a delay so the message won't be reprocessed immediately thus quickly exhausting it's max delivery count?
}

一个选项是创建一条新消息并将该消息提交到队列,但将 ScheduledEnqueueTimeUtc 设置为五分钟后。

        [FunctionName("DelayMessage")]
        public static async Task DelayMessage(
            [ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
            [ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,
            TraceWriter log)
        {
            //handle any kind of error scenerio

            var newMessage = originalMessage.Clone();

            newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);

            await newMessages.AddAsync(newMessage);

        }

正如 Josh 所说,您可以简单地克隆原始消息,设置预定的排队时间,发送克隆并完成原始消息。

很遗憾,发送克隆和完成原件不是原子操作,所以如果处理过程在错误的时刻崩溃,我们再次看到原件的可能性很小。

另一个问题是克隆上的 DeliveryCount 始终为 1,因为这是一条全新的消息。所以我们可以无限地重新提交并且永远不会回避这个消息的死信。

幸运的是,这可以通过添加我们自己的 重新提交计数 作为消息的 属性 来解决:

[FunctionName("DelayMessage")]
public static async Task DelayMessage([ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
            [ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,TraceWriter log)
{
     //handle any kind of error scenerio
     int resubmitCount = originalMessage.Properties.ContainsKey("ResubmitCount") ?  (int)originalMessage.Properties["ResubmitCount"] : 0;
     if (resubmitCount > 5)
     {
         Console.WriteLine("DEAD-LETTERING");
         originalMessage.DeadLetter("Too many retries", $"ResubmitCount is {resubmitCount}");
     }
     else
     {
         var newMessage = originalMessage.Clone();
         newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
         await newMessages.AddAsync(newMessage);
     }
}

更多细节,你可以参考这个article

此外,在 LogicApp 中实现 wait/retry/dequeue 下一个模式非常容易,因为这种类型的流控制正是 LogicApps 的设计目标。请参考这个.

您现在可以使用 fixed delay retry,它于 2020 年 11 月左右添加到 Azure Functions(预览版)。

[FunctionName("MyFunction")]
[FixedDelayRetry(10, "00:05:00")]   // retries with a 5-minute delay
public static void Run([ServiceBusTrigger("someTopic", 
     "someSubscription", AccessRights.Manage, Connection = 
     "ServiceBusConnection")] BrokeredMessage message) 
{
     CallService(bodyOfBrokeredMessage); //service is down
}