在 Azure Function 中接收服务总线消息
Services bus message receive in Azure Function
我创建了一个 ServicesBus 队列并开始发送数千条消息
await queueClient.SendBatchAsync(brokeredMessagesList);
另一方面,我创建了 azure 函数来接收来自 ServicesBus 队列的消息。这是我的代码
public static async Task RunAsync([ServiceBusTrigger("batch-input", AccessRights.Manage,
Connection = "ServiceBusConnection")]BrokeredMessage message, TraceWriter log)
{
string body;
using (var stream = message.GetBody<Stream>())
using (var streamReader = new StreamReader(stream, Encoding.UTF8))
{
body = await streamReader.ReadToEndAsync();
}
SampleMessage sampleMessage = JsonConvert.DeserializeObject<SampleMessage>(body);
await SaveMessageIntoAzureTableAsync(sampleMessage);
//await message.CompleteAsync();
log.Info($"Message body: {sampleMessage}");
}
此代码在我的本地工作,但我对缩放和批处理更好奇。
Azure 函数缩放如何处理队列中的大量消息?我没有看到任何文档
预取计数设置将应用于本地? here 我看到默认预取计数是 100。
我应该什么时候调用完整方法(有或没有这个我的代码都可以工作)?
await message.CompleteAsync();
函数运行时将批量检索消息并并行调用多个函数调用。您可以使用 host.json
文件调整预取计数和一些其他设置。
如果队列大小持续增长,Azure 将启动更多实例,这也会并行处理。
您永远不应该显式调用 CompleteAsync
,这将由运行时为您完成以确保任何成功执行。
我创建了一个 ServicesBus 队列并开始发送数千条消息
await queueClient.SendBatchAsync(brokeredMessagesList);
另一方面,我创建了 azure 函数来接收来自 ServicesBus 队列的消息。这是我的代码
public static async Task RunAsync([ServiceBusTrigger("batch-input", AccessRights.Manage,
Connection = "ServiceBusConnection")]BrokeredMessage message, TraceWriter log)
{
string body;
using (var stream = message.GetBody<Stream>())
using (var streamReader = new StreamReader(stream, Encoding.UTF8))
{
body = await streamReader.ReadToEndAsync();
}
SampleMessage sampleMessage = JsonConvert.DeserializeObject<SampleMessage>(body);
await SaveMessageIntoAzureTableAsync(sampleMessage);
//await message.CompleteAsync();
log.Info($"Message body: {sampleMessage}");
}
此代码在我的本地工作,但我对缩放和批处理更好奇。
Azure 函数缩放如何处理队列中的大量消息?我没有看到任何文档
预取计数设置将应用于本地? here 我看到默认预取计数是 100。
我应该什么时候调用完整方法(有或没有这个我的代码都可以工作)?
await message.CompleteAsync();
函数运行时将批量检索消息并并行调用多个函数调用。您可以使用 host.json
文件调整预取计数和一些其他设置。
如果队列大小持续增长,Azure 将启动更多实例,这也会并行处理。
您永远不应该显式调用 CompleteAsync
,这将由运行时为您完成以确保任何成功执行。