如何使用 Dataflow 将数据从 Google Pub/Sub 批量处理到 Cloud Storage?
How to batch process data from Google Pub/Sub to Cloud Storage using Dataflow?
我正在构建一个变更数据捕获管道,该管道从 MYSQL 数据库读取数据并在 BigQuery 中创建一个副本。我将推送 Pub/Sub 中的更改并使用 Dataflow 将它们传输到 Google 云存储。我已经能够弄清楚如何流式传输更改,但我需要 运行 对数据库中的几个表进行批处理。
Dataflow 可以用于 运行 批处理作业,同时从 Pub/Sub 等无限源读取数据吗?我可以 运行 此批处理作业将数据从 Pub/Sub 传输到 Cloud Storage,然后将此数据加载到 BigQuery 吗?我想要批处理作业,因为流作业成本更高。
谢谢你的精确。
首先,当您在 Dataflow(Beam 框架)中使用 PubSub 时,它只能在 streaming mode
中使用
Cloud Pub/Sub sources and sinks are currently supported only in streaming pipelines, during remote execution.
如果您的流程不需要实时,您可以跳过数据流并节省资金。您可以将 Cloud Functions 或 Cloud 运行 用于我建议的过程(如果您愿意,也可以使用 App Engine,但不是我的第一个建议)。
在这两种情况下,创建一个由 Cloud Scheduler.
定期(每周?)触发的流程(Cloud 运行 或 Cloud Function)
解决方案 1
- 将您的进程连接到请求订阅
- 每次您读取一条消息(或一大块消息,例如 1000 条)时,将流写入 BigQuery。 -> 但是,大查询上的流写入不是免费的(每 Gb 0.05 美元)
- 循环直到队列为空。将超时设置为最大值(Cloud Function 为 9 分钟,Cloud 为 15 分钟 运行)以防止任何超时问题
解决方案 2
- 将您的进程连接到请求订阅
- 读取一大块消息(例如 1000 条)并将它们写入内存(写入数组)。
- 循环直到队列为空。将超时设置为最大值(Cloud Function 为 9 分钟,Cloud 运行 为 15 分钟)以防止出现任何超时问题。还将内存设置为最大值 (2Gb) 以防止内存不足崩溃。
- 从您的内存数据数组创建一个加载到 BigQuery 的作业。 -> 这里的加载作业是免费的,您每天和每个 table.
限制为 1000 个加载作业
但是,如果您的应用 + 数据大小大于 ma 内存值,此解决方案可能会失败。另一种方法是每隔例如每 100 万行(取决于每行的大小和内存占用)在 GCS 中创建一个文件。使用唯一前缀命名文件,例如当天的日期 (YYYYMMDD-tempFileXX),并在每次创建文件时递增 XX。然后,创建一个加载作业,不是从内存中的数据,而是使用文件名中带有通配符 (gs://myBucket/YYYYMMDD-tempFile*
) 的 GCS 中的数据。这样所有与前缀匹配的文件都会被加载。
建议 PubSub 消息在 pubsub 订阅中最多保留 7 天。我建议您至少每 3 天触发一次流程,以便在将消息删除到订阅中之前有时间做出反应和调试。
个人经验 流写入 BigQuery 对于小数据量来说很便宜。对于一些美分,我建议您考虑第一个解决方案,即您可以为此付费。管理和代码是smaller/easier!
我正在构建一个变更数据捕获管道,该管道从 MYSQL 数据库读取数据并在 BigQuery 中创建一个副本。我将推送 Pub/Sub 中的更改并使用 Dataflow 将它们传输到 Google 云存储。我已经能够弄清楚如何流式传输更改,但我需要 运行 对数据库中的几个表进行批处理。
Dataflow 可以用于 运行 批处理作业,同时从 Pub/Sub 等无限源读取数据吗?我可以 运行 此批处理作业将数据从 Pub/Sub 传输到 Cloud Storage,然后将此数据加载到 BigQuery 吗?我想要批处理作业,因为流作业成本更高。
谢谢你的精确。
首先,当您在 Dataflow(Beam 框架)中使用 PubSub 时,它只能在 streaming mode
中使用Cloud Pub/Sub sources and sinks are currently supported only in streaming pipelines, during remote execution.
如果您的流程不需要实时,您可以跳过数据流并节省资金。您可以将 Cloud Functions 或 Cloud 运行 用于我建议的过程(如果您愿意,也可以使用 App Engine,但不是我的第一个建议)。
在这两种情况下,创建一个由 Cloud Scheduler.
定期(每周?)触发的流程(Cloud 运行 或 Cloud Function)解决方案 1
- 将您的进程连接到请求订阅
- 每次您读取一条消息(或一大块消息,例如 1000 条)时,将流写入 BigQuery。 -> 但是,大查询上的流写入不是免费的(每 Gb 0.05 美元)
- 循环直到队列为空。将超时设置为最大值(Cloud Function 为 9 分钟,Cloud 为 15 分钟 运行)以防止任何超时问题
解决方案 2
- 将您的进程连接到请求订阅
- 读取一大块消息(例如 1000 条)并将它们写入内存(写入数组)。
- 循环直到队列为空。将超时设置为最大值(Cloud Function 为 9 分钟,Cloud 运行 为 15 分钟)以防止出现任何超时问题。还将内存设置为最大值 (2Gb) 以防止内存不足崩溃。
- 从您的内存数据数组创建一个加载到 BigQuery 的作业。 -> 这里的加载作业是免费的,您每天和每个 table. 限制为 1000 个加载作业
但是,如果您的应用 + 数据大小大于 ma 内存值,此解决方案可能会失败。另一种方法是每隔例如每 100 万行(取决于每行的大小和内存占用)在 GCS 中创建一个文件。使用唯一前缀命名文件,例如当天的日期 (YYYYMMDD-tempFileXX),并在每次创建文件时递增 XX。然后,创建一个加载作业,不是从内存中的数据,而是使用文件名中带有通配符 (gs://myBucket/YYYYMMDD-tempFile*
) 的 GCS 中的数据。这样所有与前缀匹配的文件都会被加载。
建议 PubSub 消息在 pubsub 订阅中最多保留 7 天。我建议您至少每 3 天触发一次流程,以便在将消息删除到订阅中之前有时间做出反应和调试。
个人经验 流写入 BigQuery 对于小数据量来说很便宜。对于一些美分,我建议您考虑第一个解决方案,即您可以为此付费。管理和代码是smaller/easier!