如何在 Azure WebJob 函数中执行异步

How to do Async in Azure WebJob function

我有一个从服务器获取 api 数据的异步方法。当我 运行 这段代码在我的本地机器上时,在控制台应用程序中,它以高速执行,每分钟在异步函数中推送数百个 http 调用。然而,当我将相同的代码从 Azure WebJob 队列消息中触发时,它似乎同步运行并且我的数字爬行 - 我确定我在我的方法中遗漏了一些简单的东西 - 感谢任何帮助。

(1) .. WebJob 函数,用于侦听队列中的消息并启动 api 接收到的消息的获取进程:

public class Functions
    {
        // This function will get triggered/executed when a new message is written 
        // on an Azure Queue called queue.

        public static async Task ProcessQueueMessage ([QueueTrigger("myqueue")] string message, TextWriter log)
        {
            var getAPIData = new GetData();
            getAPIData.DoIt(message).Wait();
            log.WriteLine("*** done: " + message);
        }
    }

(2) azure 之外的class 在异步模式下工作的速度...

 class GetData
    {
        // wrapper that is called by the message function trigger
        public async Task DoIt(string MessageFile)
        {
            await CallAPI(MessageFile);
        }

        public async Task<string> CallAPI(string MessageFile)
        {
            /// create a list of sample APIs to call...
            var apiCallList = new List<string>();
            apiCallList.Add("localhost/?q=1");
            apiCallList.Add("localhost/?q=2");
            apiCallList.Add("localhost/?q=3");
            apiCallList.Add("localhost/?q=4");
            apiCallList.Add("localhost/?q=5");

            // setup httpclient
            HttpClient client =
                new HttpClient() { MaxResponseContentBufferSize = 10000000 };
            var timeout = new TimeSpan(0, 5, 0); // 5 min timeout
            client.Timeout = timeout;

            // create a list of http api get Task...
            IEnumerable<Task<string>> allResults = apiCallList.Select(str => ProcessURLPageAsync(str, client));
            // wait for them all to complete, then move on...
            await Task.WhenAll(allResults);

            return allResults.ToString();
        }

        async Task<string> ProcessURLPageAsync(string APIAddressString, HttpClient client)
        {
            string page = "";
            HttpResponseMessage resX;

            try
            {
                // set the address to call
                Uri URL = new Uri(APIAddressString);
                // execute the call
                resX = await client.GetAsync(URL);
                page = await resX.Content.ReadAsStringAsync();
                string rslt = page;
                // do something with the api response data
            }
            catch (Exception ex)
            {
                // log error
            }
            return page;
        }

    }

读一读 Webjobs SDK documentation - 您应该期望的行为是您的进程将 运行 一次处理一条消息,但如果创建更多实例(的您的应用程序服务)。如果您有多个队列,它们将并行触发。

为了提高性能,请看我发给你的link中的配置设置部分,指的是可以批量触发的消息条数。

如果你想并行处理多条消息,并且不想依赖实例缩放,那么你需要使用线程来代替(异步不是关于多线程并行,而是为了更有效地使用您正在使用的线程)。所以你的队列触发器函数应该从队列中读取消息,创建一个线程和"fire and forget"那个线程,然后从触发器函数中return。这会将消息标记为已处理,并允许处理队列中的下一条消息,即使理论上您仍在处理较早的消息。请注意,您需要包含自己的错误处理逻辑,并确保在您的线程抛出异常或无法处理消息(例如,将其放入有毒队列)时数据不会丢失。

另一种选择是不使用 [queuetrigger] 属性,而是直接使用 Azure 存储队列 sdk API 函数根据您的要求连接和处理消息。

首先因为你的触发函数是async,你应该使用await而不是.Wait()。等待会阻塞当前线程。

public static async Task ProcessQueueMessage([QueueTrigger("myqueue")] string message, TextWriter log)
{
    var getAPIData = new GetData();
    await getAPIData.DoIt(message);
    log.WriteLine("*** done: " + message);
}

无论如何,您都可以从 documentation

中找到有用的信息

Parallel execution

If you have multiple functions listening on different queues, the SDK will call them in parallel when messages are received simultaneously.

The same is true when multiple messages are received for a single queue. By default, the SDK gets a batch of 16 queue messages at a time and executes the function that processes them in parallel. The batch size is configurable. When the number being processed gets down to half of the batch size, the SDK gets another batch and starts processing those messages. Therefore the maximum number of concurrent messages being processed per function is one and a half times the batch size. This limit applies separately to each function that has a QueueTrigger attribute.

这是配置批量大小的示例代码:

var config = new JobHostConfiguration();
config.Queues.BatchSize = 50;
var host = new JobHost(config);
host.RunAndBlock();

但是,同时拥有太多线程 运行 并不总是一个好的选择,并且可能会导致性能不佳。

另一种选择是扩展您的网络作业:

Multiple instances

if your web app runs on multiple instances, a continuous WebJob runs on each machine, and each machine will wait for triggers and attempt to run functions. The WebJobs SDK queue trigger automatically prevents a function from processing a queue message multiple times; functions do not have to be written to be idempotent. However, if you want to ensure that only one instance of a function runs even when there are multiple instances of the host web app, you can use the Singleton attribute.