自动将数据上传到 Google Cloud Storage 和 BigQuery
Automate data upload to Google Cloud Storage and BigQuery
我正在尝试将一些数据从我的应用程序传输到 Big Query,这样我就可以对其进行 运行 SQL 查询。以下是我正在执行的步骤,想知道是否有工具可以自动执行此过程。
从终端手动下载应用程序数据,格式为JSON。
运行 python 脚本将此文件解析为可读 JSON 并进行修改(例如,将空格替换为下划线等,以便能够上传到 Google 云存储 (GCS)。
将此更新后的文件手动上传到 GCS。
使用来自 GCS 的数据在 Big Query 中使用模式手动生成数据集和表。
有没有办法使用云工具自动化整个过程?我查看了 Google Dataflow,但它似乎不是解决此问题的工具。还是我应该有一个 python 程序来完成上述所有操作?
请指教
了解 apache airflow 和此框架的 google 云版本:composer。您可以为每个任务设置一个工作流程,这是您提到的一个步骤。
Airflow 支持很多运算符:BashOperator、PythonOperator、BigQuery、...
为了解决您的问题,我将编写 3 项服务(例如 Cloud Functions 或 Cloud 运行。它是无服务器的,即用即付,并且有免费套餐,您的数量无需支付任何费用结束)。
- 服务 1:下载文件并将其作为原始文件存储在云存储中
- 服务 2:从存储中获取文件,转换文件并将其作为干净文件存储回存储
- 服务 3:在 BigQuery 中加载数据。
这里是事件驱动架构中的流程:
- 创建一个云调度程序,当您需要下载文件时调用您的服务 1(每天一次,每小时一次,...)
- 当原始文件存储在云存储中时,会发出一个事件。抓住它触发你的服务 2(有 2 种方式可以实现,如果你不知道如何,请告诉我)
- 当干净的文件存储在云存储中时,会发出一个事件。抓住它触发你的服务 3
您也可以使用 Cloud Workflow 通过编排实现相同的目的
- 在 Cloud Workflow 上部署您的工作流程
- 创建一个云调度程序,当您需要下载文件时调用您的工作流(每天一次,每小时一次,...)
- 让工作流程运行:
- 调用服务 1。
- 服务1结束后调用服务2
- 服务2结束后调用服务3
首先,您可以在同一个服务中执行这 3 个服务的所有操作。 POC 更容易。但是对于您的解决方案的可扩展性和演进性,限制每个服务的责任是更好的设计。
编辑 1
要从云存储中捕获事件,您有多种解决方案
- 如果您使用 Cloud Functions,则可以 plug it directly on GCS event。 它被命名为后台函数
- 如果您使用 http Cloud Functions 或 Cloud 运行,您可以 publish the Cloud Storage events in PubSub. Then, on PubSub, you can create a push subscription to call an HTTP endpoint(http Cloud Functions 或 Cloud 运行)。注意这里的安全。如果你被卡住了,再问我一次。 这是我最喜欢也是最通用的解决方案
- 如果您使用 Cloud 运行,您还可以使用 Eventarc 来捕获 Storage 事件并调用 Cloud 运行 服务。 EventArc 在“一键部署”中为您包装了之前的解决方案
我正在尝试将一些数据从我的应用程序传输到 Big Query,这样我就可以对其进行 运行 SQL 查询。以下是我正在执行的步骤,想知道是否有工具可以自动执行此过程。
从终端手动下载应用程序数据,格式为JSON。
运行 python 脚本将此文件解析为可读 JSON 并进行修改(例如,将空格替换为下划线等,以便能够上传到 Google 云存储 (GCS)。
将此更新后的文件手动上传到 GCS。
使用来自 GCS 的数据在 Big Query 中使用模式手动生成数据集和表。
有没有办法使用云工具自动化整个过程?我查看了 Google Dataflow,但它似乎不是解决此问题的工具。还是我应该有一个 python 程序来完成上述所有操作?
请指教
了解 apache airflow 和此框架的 google 云版本:composer。您可以为每个任务设置一个工作流程,这是您提到的一个步骤。 Airflow 支持很多运算符:BashOperator、PythonOperator、BigQuery、...
为了解决您的问题,我将编写 3 项服务(例如 Cloud Functions 或 Cloud 运行。它是无服务器的,即用即付,并且有免费套餐,您的数量无需支付任何费用结束)。
- 服务 1:下载文件并将其作为原始文件存储在云存储中
- 服务 2:从存储中获取文件,转换文件并将其作为干净文件存储回存储
- 服务 3:在 BigQuery 中加载数据。
这里是事件驱动架构中的流程:
- 创建一个云调度程序,当您需要下载文件时调用您的服务 1(每天一次,每小时一次,...)
- 当原始文件存储在云存储中时,会发出一个事件。抓住它触发你的服务 2(有 2 种方式可以实现,如果你不知道如何,请告诉我)
- 当干净的文件存储在云存储中时,会发出一个事件。抓住它触发你的服务 3
您也可以使用 Cloud Workflow 通过编排实现相同的目的
- 在 Cloud Workflow 上部署您的工作流程
- 创建一个云调度程序,当您需要下载文件时调用您的工作流(每天一次,每小时一次,...)
- 让工作流程运行:
- 调用服务 1。
- 服务1结束后调用服务2
- 服务2结束后调用服务3
首先,您可以在同一个服务中执行这 3 个服务的所有操作。 POC 更容易。但是对于您的解决方案的可扩展性和演进性,限制每个服务的责任是更好的设计。
编辑 1
要从云存储中捕获事件,您有多种解决方案
- 如果您使用 Cloud Functions,则可以 plug it directly on GCS event。 它被命名为后台函数
- 如果您使用 http Cloud Functions 或 Cloud 运行,您可以 publish the Cloud Storage events in PubSub. Then, on PubSub, you can create a push subscription to call an HTTP endpoint(http Cloud Functions 或 Cloud 运行)。注意这里的安全。如果你被卡住了,再问我一次。 这是我最喜欢也是最通用的解决方案
- 如果您使用 Cloud 运行,您还可以使用 Eventarc 来捕获 Storage 事件并调用 Cloud 运行 服务。 EventArc 在“一键部署”中为您包装了之前的解决方案