Azure 排队服务总线超过百万条记录的功能没有到达终点

Azure function of queueing to service bus over million record does not reach to the end

我正在尝试实施一个定期(每周一次)运行并调用外部 api 的解决方案,其中包含 1,500,000 个项目元数据 {{domain}}/items,然后尝试找出每个项目如果项目需要根据某种任意逻辑更新或插入到数据库中。

经过几次 attempts 我最终实现了一个具有两个 azure 函数(一个用于入队,另一个用于出队)的服务总线解决方案。

第一个 azure 函数定期触发并调用外部 api 获取 150 万个项目的元数据(高级计划)- 每个项目约为 1.9 KB:

[FunctionName("EnqueueFooMetadata")]
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
[ServiceBus("foosmetadata", Connection = "ServiceBusConnection")] IAsyncCollector<FooMetadata> foosMetadataQueue)
{
    IEnumerable<FooMetadata> foosMetadata = await _service.GetFoosMetadata();

    this._logger.LogTrace($"Start Enqueue {foosMetadata.Count()}");
    await Task.Run(() =>
    {
        Parallel.ForEach(foosMetadata, new ParallelOptions() { }, async (FooMetadata fooMetadata) =>
        {
            await foosMetadataQueue.AddAsync(fooMetadata);
        });
    });
    this._logger.LogTrace($"Done Enqueue {foosMetadata.Count()}");
}

在服务总线的另一端有一个绑定到它的函数:

[FunctionName("DequeueGiataProperties")]
public async Task Run([ServiceBusTrigger("foosmetadata", Connection = "ServiceBusConnection")] FooMetadata foo)
{
    var getGiataProperiesResult = await _service.Dequeue(foo);

    this._logger.LogTrace($"dequeing item: {foo.Id}, was done successfully.");
}

对于少量项目(当 IEnumerable<FooMetadata> foosMetadata = await _service.GetFoosMetadata(); 中的 foosMetadata 的计数约为 15,000 时)它按预期工作,并且我可以看到 Done Enqueue... 的踪迹,但是对于大量的项目,它总是停在中间的某个地方,我看不到痕迹。

我不想转移建议的答案,但它看起来像是 azure 函数的超时问题。 有什么处理大数据问题的建议

我认为您遇到了很多问题,其中大部分都在代码的发布者部分。

  1. Parallel.Foreeach 不是异步的,编译器允许你 写asyc代码,但是Parrallel.ForEach其实是同步的 特征。您在 Parallel.ForEach 中使用异步 lambas,它将 有意想不到的行为。
  2. 第二个问题可能是您的 Azure 函数的超时问题。 根据计划,您最多有 5(分钟消费 计划)和(付费计划 20 分钟)供您完成功能。 在 150 万次中调用 API 你期望它 将在那个时间范围内完成,很可能是 调用 API 甚至 1/10 秒的开销都会破坏 时间限制。

有很多方法可以打破 Parallel.ForeEach,主要是切换到使用基于任务的并行机制并结合 DataFlow ActionBlock 等机制。

考虑到您正在拨打的 API 个电话的数量,时间问题可能更难解决,但是

  1. 服务总线在添加消息时支持批处理,您可以在其中 一次将多条消息添加到队列中,你提到你在 高级计划,允许多条最大 1M 的消息 立即发布到服务总线。这个简单的改变可能会给你 足够的性能来发布您的所有消息。

如果没有完整的代码示例和消息大小样本,就很难对您提出的问题给出明确的答案。 因此,我建议您提供一个完整的工作示例,以帮助其他人尝试解决您面临的问题。

在单个函数调用中将 150 万个项目转换为消息听起来像是罪魁祸首。前面提到的并行 foreachTask.Run 也无济于事。结合批处理 IAsyncCollector 难怪它会停滞不前。此处的问题可能还在于尝试发送的消息的总体大小以及 Functions SDK 中的底层实现。每个项目 60 个字节,让我们平均再计算 40 个字节的开销(headers、系统属性、AMQP 额外),即 150,000,000 个字节或 143 MB。

我建议的是以下几个选项:

  1. 如果可能,减少调用返回的项目数。
  2. 否则,将批处理分成更小的块并将这些块作为几条消息发送。这也将提高可靠性,因为您的 HTTP 请求最终将被转换为一系列可靠处理的消息。

另一种选择是调查冲洗 IAsyncCollector 以强制其发送较小的批次。如果不可能,请使用您自己的消息发件人。最后,当您使用 in-process SDK 时,您可以利用服务总线功能扩展 (Microsoft.Azure.WebJobs.Extensions.ServiceBus) 的预览,该扩展几乎已过期,目前为 5.0.0-beta.5。通过此版本,您将能够使用 Azure 服务总线的最新 SDK 和安全批处理 built-in (ServiceBusMessageBatch)。