Google Cloud Datastore:导出与特定查询匹配的实体

Google Cloud Datastore: Exporting entities that match a certain query

背景

我需要向大约 100 万台设备发送大量通知,我正在使用 Google Cloud Functions 构建它。

在当前设置中,我将每个设备令牌作为 PubSub 消息排队:

此过程由人工上传包含所有令牌的 CSV 文件手动启动。内置重试原则上应该足够了,但我想确保如果云功能本身或 APNs/FCM 出现问题,我可以 return 中所有失败令牌的 CSV与上传的格式相同,以便用户只能重试失败的 when/if 他们认为这是个好主意。

我 运行 将通知作为我用于查询通知状态的作业的一部分。为此,我在 job_idstatus 上设置了一个复合索引,并且我 运行 查询了所有匹配的通知,并希望将其作为文件流式传输给用户或将其存储在 Google 云存储中,以便用户可以从那里下载。

问题

假设接近通知总数的事情失败并且我想在一个文件中获取所有标记,我的第一个实现只是遍历所有匹配的条目并构建结果。问题是,以这种方式检索每个 100_000 条目大约需要 1 分钟。对于接近所有通知的东西,我会超出 Cloud Function 的最大超时时间。每个实体总共大约 300 字节,这使得整个导出大约 300MB。我可以通过添加一个更大的索引来将其减小到大约 half/two-thirds 的大小,这样我就可以只对我想要的字段进行投影。

我能想到的唯一选择是将通知分片​​以将整个组分成 100 个分片,创建 100 个文件,每个文件包含 10k 条通知,然后将它们全部下载并在用户尝试下载时将它们拼接在一起文件。

我发布这个问题的原因是,这感觉像是一个相对简单的问题,而且这个解决方案感觉比我预期的要复杂一些,所以我想我可能遗漏了一些东西。

问题

代码

为清楚起见,这是我正在 运行 的代码片段,我只是迭代了此 return 的响应以生成输出。

    def get_failures(job_id):
        query = client.query(kind = Notification.kind)
        query.add_filter('job_id', '=', str(job_id))
        query.add_filter('status', '=', "failure")
        return query.fetch()

此问题的可靠解决方案是使用 Google 数据流。我目前使用它来完成此操作,在 Google Cloud Storage 中生成 csv 文件,其中包含与给定数据存储查询匹配的所有 ~500k 记录。

不过,设置它可能有点复杂。

在开始之前,我使用了 Google 任务队列,它有 10 分钟的超时而不是 30 秒的超时。我不确定您是否可以纯粹在云函数中执行此操作,或者您是否需要启动一个简单的应用程序引擎项目来充当这些任务的请求处理程序