限制 Azure Functions 队列上的并发作业数

Limiting the number of concurrent jobs on Azure Functions queue

我在 Azure 中有一个 Function 应用程序,它会在将项目放入队列时触发。它看起来像这样(大大简化):

public static async Task Run(string myQueueItem, TraceWriter log)
{
    using (var client = new HttpClient())
    {
        client.BaseAddress = new Uri(Config.APIUri);
        client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

        StringContent httpContent = new StringContent(myQueueItem, Encoding.UTF8, "application/json");
        HttpResponseMessage response = await client.PostAsync("/api/devices/data", httpContent);
        response.EnsureSuccessStatusCode();

        string json = await response.Content.ReadAsStringAsync();
        ApiResponse apiResponse = JsonConvert.DeserializeObject<ApiResponse>(json);

        log.Info($"Activity data successfully sent to platform in {apiResponse.elapsed}ms.  Tracking number: {apiResponse.tracking}");
    }
}

这一切都很好并且运行良好。每次将项目放入队列时,我们都会将数据发送到我们这边的某个 API 并记录响应。酷

当 "the thing that generates queue messages" 中有一个大峰值并且大量项目同时放入队列时,就会出现问题。这往往会在一分钟内发生大约 1,000 - 1,500 个项目。错误日志将包含如下内容:

2017-02-14T01:45:31.692 mscorlib: Exception while executing function: Functions.SendToLimeade. f-SendToLimeade__-1078179529: An error occurred while sending the request. System: Unable to connect to the remote server. System: Only one usage of each socket address (protocol/network address/port) is normally permitted 123.123.123.123:443.

起初,我认为这是 Azure Function 应用 运行 超出本地套接字的问题,如 illustrated here。但是,然后我注意到了 IP 地址。 IP 地址 123.123.123.123(当然在此示例中已更改)是我们的 IP 地址,即 HttpClient 发送到的 IP 地址。所以,现在我想知道是否 我们的 服务器 运行 没有套接字来处理这些请求。

无论哪种方式,我们这里都存在缩放问题。我正在尝试找出解决它的最佳方法。

一些想法:

  1. 如果是本地套接字限制,article above 有使用 Req.ServicePoint.BindIPEndPointDelegate 增加本地端口范围的示例。这看起来很有希望,但是当您 确实 需要扩展时,您会怎么做?我不希望这个问题在 2 年内再次出现。
  2. 如果是远程限制,看来我可以控制 Functions 运行时一次处理多少条消息。这里有一篇有趣的文章说您可以将 serviceBus.maxConcurrentCalls 设置为 1 并且一次只会处理一条消息。也许我可以将其设置为相对较低的数字。现在,在某些时候我们的队列将比我们处理它们的速度更快地填满,但那时答案是在我们这边添加更多服务器。
  3. 多个 Azure Functions 应用程序?如果我有多个 Azure Functions 应用程序并且它们都在同一个队列上触发,会发生什么情况? Azure 是否足够聪明,可以在 Function 应用程序之间分配工作,我可以让一大群机器处理我的队列,可以根据需要扩大或缩小?
  4. 我也遇到过保活。在我看来,如果我能以某种方式在队列消息涌入时保持我的套接字打开,它可能会有很大帮助。这可能吗,关于我如何去做的任何提示?

如果您对此类系统的推荐(可扩展!)设计有任何见解,我们将不胜感激!

如果您在专用网络应用程序上使用消费计划而不是 Functions,#3 或多或少会立即出现。函数将检测到您有大量消息队列并添加实例直到队列长度稳定。

maxConcurrentCalls 仅适用于每个实例,允许您限制 per-instance 并发。基本上,你的处理速度是maxConcurrentCalls * instanceCount.

控制全局吞吐量的唯一方法是在您选择大小的专用 Web 应用程序上使用 Functions。每个应用程序都会轮询队列并根据需要抓取工作。

最好的扩展解决方案将改进 123.123.123.123 上的负载平衡,以便它可以处理来自 Functions 扩展 up/down 的任意数量的请求以满足队列压力。

Keep alive afaik 对于持久连接很有用,但函数执行不被视为持久连接。将来我们会尝试将 'bring your own binding' 添加到 Functions,如果您愿意,这将允许您实现连接池。

我想我已经找到解决办法了。在过去的 3 小时 6 小时内,我已经 运行 这些更改,并且我的套接字错误为零。在我每 30 分钟左右大量收到这些错误之前。

首先,我添加了一个新的 class 来管理 HttpClient。

public static class Connection
{
    public static HttpClient Client { get; private set; }

    static Connection()
    {
        Client = new HttpClient();

        Client.BaseAddress = new Uri(Config.APIUri);
        Client.DefaultRequestHeaders.Add("Connection", "Keep-Alive");
        Client.DefaultRequestHeaders.Add("Keep-Alive", "timeout=600");
        Client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
    }
}

现在,我们有一个 HttpClient 的静态实例,用于每次调用该函数。根据我的研究,强烈建议尽可能长时间地保留 HttpClient 实例,一切都是线程安全的,并且 HttpClient 将 queue 增加请求并优化对同一主机的请求。请注意,我还设置了 Keep-Alive headers(我认为这是默认设置,但我想我会隐含)。

在我的函数中,我只是获取静态 HttpClient 实例,例如:

var client = Connection.Client;
StringContent httpContent = new StringContent(myQueueItem, Encoding.UTF8, "application/json");
HttpResponseMessage response = await client.PostAsync("/api/devices/data", httpContent);
response.EnsureSuccessStatusCode();

我还没有真正对套接字级别发生的情况进行任何 in-depth 分析(我将不得不询问我们的 IT 人员是否能够在负载均衡器上看到此流量),但我希望它只保持一个套接字对我们的服务器开放,并在处理 queue 项时进行一堆 HTTP 调用。无论如何,无论它在做什么似乎都在起作用。也许有人对如何改进有一些想法。

我知道这个问题很久以前就得到了回答,但与此同时,Microsoft 已经记录了您使用的反模式。

Improper Instantiation antipattern

我认为代码错误是因为:using (var client = new HttpClient())

引自Improper instantiation antipattern:

this technique is not scalable. A new HttpClient object is created for each user request. Under heavy load, the web server may exhaust the number of available sockets.