当消息在 ServiceBusTrigger Azure Function 中处理失败时,我如何延迟处理相同消息 x 分钟?
When a message fails to process in a ServiceBusTrigger Azure Function, how can I delay processing the same message for x minutes?
我有一个从 ServiceBus 主题读取并调用第 3 方服务的 azure 函数。如果服务中断,我想等待 5 分钟,然后再尝试用相同的消息再次调用它。如何添加延迟,以便 azure 函数不会放弃消息并立即再次将其取回?
public static void Run([ServiceBusTrigger("someTopic",
"someSubscription", AccessRights.Manage, Connection =
"ServiceBusConnection")] BrokeredMessage message)
{
CallService(bodyOfBrokeredMessage); //service is down
//How do I add a delay so the message won't be reprocessed immediately thus quickly exhausting it's max delivery count?
}
一个选项是创建一条新消息并将该消息提交到队列,但将 ScheduledEnqueueTimeUtc 设置为五分钟后。
[FunctionName("DelayMessage")]
public static async Task DelayMessage(
[ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
[ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,
TraceWriter log)
{
//handle any kind of error scenerio
var newMessage = originalMessage.Clone();
newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
await newMessages.AddAsync(newMessage);
}
正如 Josh 所说,您可以简单地克隆原始消息,设置预定的排队时间,发送克隆并完成原始消息。
很遗憾,发送克隆和完成原件不是原子操作,所以如果处理过程在错误的时刻崩溃,我们再次看到原件的可能性很小。
另一个问题是克隆上的 DeliveryCount
将 始终为 1,因为这是一条全新的消息。所以我们可以无限地重新提交并且永远不会回避这个消息的死信。
幸运的是,这可以通过添加我们自己的 重新提交计数 作为消息的 属性 来解决:
[FunctionName("DelayMessage")]
public static async Task DelayMessage([ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
[ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,TraceWriter log)
{
//handle any kind of error scenerio
int resubmitCount = originalMessage.Properties.ContainsKey("ResubmitCount") ? (int)originalMessage.Properties["ResubmitCount"] : 0;
if (resubmitCount > 5)
{
Console.WriteLine("DEAD-LETTERING");
originalMessage.DeadLetter("Too many retries", $"ResubmitCount is {resubmitCount}");
}
else
{
var newMessage = originalMessage.Clone();
newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
await newMessages.AddAsync(newMessage);
}
}
更多细节,你可以参考这个article。
此外,在 LogicApp 中实现 wait/retry/dequeue 下一个模式非常容易,因为这种类型的流控制正是 LogicApps 的设计目标。请参考这个.
您现在可以使用 fixed delay retry,它于 2020 年 11 月左右添加到 Azure Functions(预览版)。
[FunctionName("MyFunction")]
[FixedDelayRetry(10, "00:05:00")] // retries with a 5-minute delay
public static void Run([ServiceBusTrigger("someTopic",
"someSubscription", AccessRights.Manage, Connection =
"ServiceBusConnection")] BrokeredMessage message)
{
CallService(bodyOfBrokeredMessage); //service is down
}
我有一个从 ServiceBus 主题读取并调用第 3 方服务的 azure 函数。如果服务中断,我想等待 5 分钟,然后再尝试用相同的消息再次调用它。如何添加延迟,以便 azure 函数不会放弃消息并立即再次将其取回?
public static void Run([ServiceBusTrigger("someTopic",
"someSubscription", AccessRights.Manage, Connection =
"ServiceBusConnection")] BrokeredMessage message)
{
CallService(bodyOfBrokeredMessage); //service is down
//How do I add a delay so the message won't be reprocessed immediately thus quickly exhausting it's max delivery count?
}
一个选项是创建一条新消息并将该消息提交到队列,但将 ScheduledEnqueueTimeUtc 设置为五分钟后。
[FunctionName("DelayMessage")]
public static async Task DelayMessage(
[ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
[ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,
TraceWriter log)
{
//handle any kind of error scenerio
var newMessage = originalMessage.Clone();
newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
await newMessages.AddAsync(newMessage);
}
正如 Josh 所说,您可以简单地克隆原始消息,设置预定的排队时间,发送克隆并完成原始消息。
很遗憾,发送克隆和完成原件不是原子操作,所以如果处理过程在错误的时刻崩溃,我们再次看到原件的可能性很小。
另一个问题是克隆上的 DeliveryCount
将 始终为 1,因为这是一条全新的消息。所以我们可以无限地重新提交并且永远不会回避这个消息的死信。
幸运的是,这可以通过添加我们自己的 重新提交计数 作为消息的 属性 来解决:
[FunctionName("DelayMessage")]
public static async Task DelayMessage([ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
[ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,TraceWriter log)
{
//handle any kind of error scenerio
int resubmitCount = originalMessage.Properties.ContainsKey("ResubmitCount") ? (int)originalMessage.Properties["ResubmitCount"] : 0;
if (resubmitCount > 5)
{
Console.WriteLine("DEAD-LETTERING");
originalMessage.DeadLetter("Too many retries", $"ResubmitCount is {resubmitCount}");
}
else
{
var newMessage = originalMessage.Clone();
newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
await newMessages.AddAsync(newMessage);
}
}
更多细节,你可以参考这个article。
此外,在 LogicApp 中实现 wait/retry/dequeue 下一个模式非常容易,因为这种类型的流控制正是 LogicApps 的设计目标。请参考这个
您现在可以使用 fixed delay retry,它于 2020 年 11 月左右添加到 Azure Functions(预览版)。
[FunctionName("MyFunction")]
[FixedDelayRetry(10, "00:05:00")] // retries with a 5-minute delay
public static void Run([ServiceBusTrigger("someTopic",
"someSubscription", AccessRights.Manage, Connection =
"ServiceBusConnection")] BrokeredMessage message)
{
CallService(bodyOfBrokeredMessage); //service is down
}