在上传到 BigQuery 之前对 GCS 文件进行反规范化

Denormalize a GCS file before uploading to BigQuery

我在 .Net Core 中编写了一个 Cloud 运行 API,它从 GCS 位置读取文件,然后进行非规范化(即为每一行添加更多信息以包含文本描述)然后将其写入 BigQuery table。我有两个选择:

  1. 我的云 运行 API 可以创建非规范化的 CSV 文件并将它们写入另一个 GCS 位置。然后另一个云 运行 API 可以获取那些非规范化的 CSV 文件并将它们直接写入 BigQuery。
  2. 我的云 运行 API 可以读取原始 CSV 文件,在内存(文件流)中对它们进行非规范化,然后以某种方式从内存中的文件流直接写入 BigQuery table。

如果性能(速度)和成本(货币)是我的目标,那么在这种情况下写入 BigQuery 的最佳方式是什么。在反规范化之前,这些文件每个大约 10KB。每行大约 1000 个字符。去规范化后,它大约是原来的三倍。在 BigQuery 中成功加载后,我不需要保留非规范化文件。我担心性能,以及 inserts/writes 左右的任何特定 BigQuery 每日配额。除非您正在执行 DML 语句,否则我认为没有,但如果我错了,请纠正我。

我会使用将文件上传到存储桶时触发的 Cloud Functions。

Google 有一个 repo a tutorial just for this for JSON files Streaming data from Cloud Storage into BigQuery using Cloud Functions

太常见了

然后,我将修改示例 main.py 文件来自:

def streaming(data, context):
    '''This function is executed whenever a file is added to Cloud Storage'''
    bucket_name = data['bucket']
    file_name = data['name']
    db_ref = DB.document(u'streaming_files/%s' % file_name)
    if _was_already_ingested(db_ref):
        _handle_duplication(db_ref)
    else:
        try:
            _insert_into_bigquery(bucket_name, file_name)
            _handle_success(db_ref)
        except Exception:
            _handle_error(db_ref)

接受 CSV 文件:

import json
import csv
import logging
import os
import traceback
from datetime import datetime

from google.api_core import retry
from google.cloud import bigquery
from google.cloud import storage
import pytz



PROJECT_ID = os.getenv('GCP_PROJECT')
BQ_DATASET = 'fromCloudFunction'
BQ_TABLE = 'mytable'

CS = storage.Client()
BQ = bigquery.Client()


def streaming(data, context):
    '''This function is executed whenever a file is added to Cloud Storage'''
    bucket_name = data['bucket']
    file_name = data['name']

    newRows = postProcessing(bucket_name, file_name)

    # It is recommended that you save 
    # what you process for debugging reasons.
    destination_bucket = 'post-processed' # gs://post-processed/
    destination_name = file_name
    # saveRowsToBucket(newRows,destination_bucket,destination_name)
    rowsInsertIntoBigquery(newRows)



class BigQueryError(Exception):
    '''Exception raised whenever a BigQuery error happened''' 

    def __init__(self, errors):
        super().__init__(self._format(errors))
        self.errors = errors

    def _format(self, errors):
        err = []
        for error in errors:
            err.extend(error['errors'])
        return json.dumps(err)

def postProcessing(bucket_name, file_name):
    blob = CS.get_bucket(bucket_name).blob(file_name)
    my_str = blob.download_as_string().decode('utf-8')
    csv_reader = csv.DictReader(my_str.split('\n'))                                                                   
    newRows = []
    for row in csv_reader:
        modified_row = row # Add your logic
        newRows.append(modified_row)
    return newRows

def rowsInsertIntoBigquery(rows):
    table = BQ.dataset(BQ_DATASET).table(BQ_TABLE)
    errors = BQ.insert_rows_json(table,rows)
    if errors != []:
        raise BigQueryError(errors)

如果需要,仍然需要定义地图(row->newRow) 和函数saveRowsToBucket