将数据从 API 加载到 BigQuery 的首选方法是什么?

What is the preferred way to load data from an API into BigQuery?

我正在尝试将数据从 REST API 获取到 Google 云平台 (GCP) 上的 BigQuery。实现该目标的最佳方法是什么(不使用任何第三方工具,例如 Funnel.io 或 Supermetrics)?

我能找到的大多数教程都建议将数据作为 CSV 文件写入 Cloud Storage,然后使用 DataFlow 将数据加载到 BigQuery。然而,这似乎有点麻烦。应该有一种方法可以在没有写入 CSV 的中间步骤的情况下做到这一点。这可以(在 GCP 内)实现吗?如果可以,最好的方法是什么?

PS:如果数据的大小与答案相关:我正在尝试加载总共约 10,000 行数据(一次性),其中约 100每天都有新专栏 - 最好每小时更新一次。

根据 documentation:

Currently, you can load data into BigQuery only from Cloud Storage or a readable data source (such as your local machine).

因此,除非您正在加载 Datastore 或 Firestore 导出,否则文件必须位于 Google Cloud Storage 中。 GCS 有以下可用的可读格式:

Avro

CSV

JSON (newline delimited only)

ORC

Parquet

Datastore exports

Firestore exports

您应该知道 limitations for each format. In addition, there are also limitations for load jobs, they are described here

我建议您以一种可读格式从 Rest API 中获取数据,将其存储在 Google Cloud Storage 中,然后使用 Google Transfer Service 加载它进入 BigQuery。因此,没有必要使用 DataFlow。

Cloud Storage Transfer 用于将经常性数据加载直接安排到 BigQuery 中。根据文档,最小加载间隔为 1 小时,我相信这适合您的需要。您可以阅读有关此服务的更多信息 here

希望对您有所帮助。

按照上面@Kolban 的提示,在不使用第三方工具且不将中间文件写入 Google 云存储的情况下将数据从 API 加载到 BigQuery 是可能的,而且确实如此很简单,通过"streaming" data into BigQuery:

rows_to_insert = [(u"Phred Phlyntstone", 32), (u"Wylma Phlyntstone", 29)]

errors = client.insert_rows(table, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")

(来自BQ documentation)

为了准备 JSON 数据,必须将其转换为元组。这是我实现此目的的代码摘录:

# Turn JSON into tuples
data_tuples = []
for key,value in resp_json[product_id].items():
    data_tuples.append((
        value["product_id"],
        value["downloads"]
        )
    )

# Insert into BQ
errors = client.insert_rows(table, data_tuples)
if errors == []:
    print("New rows have been added.")
else:
    print(errors)