何时不在 Azure Functions ServiceBus 绑定中的 IAsyncCollector 上调用 FlushAsync

When not to call FlushAsync on IAsyncCollector in Azure Functions ServiceBus binding

我有一系列 Azure Function 应用程序,它们都使用 IAsyncCollector<Message>:

将消息输出到 Azure 服务总线
public async Task Run([ServiceBus(...)] IAsyncCollector<Message> messages)
{
    ...
    await messages.AddAsync(msg);
}

我不时收到如下所示的错误记录:

Microsoft.Azure.WebJobs.Host.FunctionInvocationException: Exception while executing function: Function
 ---> Microsoft.Azure.ServiceBus.ServiceBusTimeoutException: The operation did not complete within the allocated time 00:00:59.9999536 for object message.Reference: ..., 7/13/2020 2:46:24 PM
 ---> System.TimeoutException: The operation did not complete within the allocated time 00:00:59.9999536 for object message.
   at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.Azure.Amqp.SendingAmqpLink.EndSendMessage(IAsyncResult result)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.Azure.ServiceBus.Core.MessageSender.OnSendAsync(IList`1 messageList)
   --- End of inner exception stack trace ---
   at Microsoft.Azure.ServiceBus.Core.MessageSender.OnSendAsync(IList`1 messageList)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageSender.SendAsync(IList`1 messageList)
   at Microsoft.Azure.WebJobs.ServiceBus.Bindings.MessageSenderExtensions.SendAndCreateEntityIfNotExists(MessageSender sender, Message message, Guid functionInstanceId, EntityType entityType, CancellationToken cancellationToken)
   at My.Function.Run(String mySbMsg, IAsyncCollector`1 messages)

我很难弄清楚这种情况是在什么时候发生的。但是我最近了解到FlushAsync方法:

await messages.AddAsync(msg);
await messages.FlushAsync();

我的问题如下。为什么我永远不会在我的函数中包含对 FlushAsync 的调用?在我自己的代码中获取超时异常将使重试、进行更好的异常日志记录等成为可能。在函数代码中像这样手动刷新有什么缺点吗?

Why would I ever NOT include a call to FlushAsync in my function? Getting the timeout exception in my own code will make it possible to retry, do better exception logging, and more.

我要在这里更进一步地说,在我获得了一些 Azure Functions 的经验之后,我现在完全避免 IAsyncCollector<T>。一些实现在 AddAsync 上发布;其他实现可能会在 AddAsyncFlushAsync 上发布。我怀疑服务总线实现实际上是在 AddAsync 上发布的,在这种情况下 FlushAsync 可能是空的。

IAsyncCollector<T> 的好处在于它为您提供了“编写这些东西”的抽象概念;您所要做的就是提供一个连接字符串,剩下的就是魔法了。 IAsyncCollector<T> 的问题是它给了你一个抽象,因此你的控制力要少得多。

在引擎盖下,重试了多少次?他们是在使用持续延迟还是呈指数增长?如果它永远不会成功,会有什么行为?通常 none 的关键信息会被记录下来。

特别烦人的是 AF 团队 更改了抽象的语义 。例如,对于某些输出绑定(CosmosDB 或存储,我不记得了),重试行为从函数 SDK 的一个版本更改为下一个版本。

所以,我倾向于避免输出绑定,尤其是 IAsyncCollector<T>。我通常想以一分钟左右的上限进行紧密但呈指数增长的去相关抖动重试,但在函数运行时仅剩一分钟时中止,然后恢复行为变为将消息写入错误队列(重试)。这比 IAsyncCollector<T> 所能提供的要复杂得多,但使用 Polly 直接调用 SDK 并不难。

Any downsides of flushing manually like this within the function code?

没有。默认情况下,IAsyncCollector<T>.FlushAsync 在函数执行后由函数宿主调用。所以如果你自己调用它,你只是提前调用它。多次调用应该是安全的。

在您的代码中调用 FlushAsync 没有任何问题。 ServiceBus 输出绑定的 IAsyncCollector 当前实现是您的消息被批处理,直到调用 FlushAsync 或函数 returns。在尝试发送大量消息时,我偶然发现了与您相同的超时异常。我找到的解决方案是每 N 条消息调用 FlushAsync,在我的例子中 N=100 是最好的权衡。过于频繁地调用 FlushAsync 会引发明显的性能损失。