在上传到 BigQuery 之前对 GCS 文件进行反规范化
Denormalize a GCS file before uploading to BigQuery
我在 .Net Core 中编写了一个 Cloud 运行 API,它从 GCS 位置读取文件,然后进行非规范化(即为每一行添加更多信息以包含文本描述)然后将其写入 BigQuery table。我有两个选择:
- 我的云 运行 API 可以创建非规范化的 CSV 文件并将它们写入另一个 GCS 位置。然后另一个云 运行 API 可以获取那些非规范化的 CSV 文件并将它们直接写入 BigQuery。
- 我的云 运行 API 可以读取原始 CSV 文件,在内存(文件流)中对它们进行非规范化,然后以某种方式从内存中的文件流直接写入 BigQuery table。
如果性能(速度)和成本(货币)是我的目标,那么在这种情况下写入 BigQuery 的最佳方式是什么。在反规范化之前,这些文件每个大约 10KB。每行大约 1000 个字符。去规范化后,它大约是原来的三倍。在 BigQuery 中成功加载后,我不需要保留非规范化文件。我担心性能,以及 inserts/writes 左右的任何特定 BigQuery 每日配额。除非您正在执行 DML 语句,否则我认为没有,但如果我错了,请纠正我。
我会使用将文件上传到存储桶时触发的 Cloud Functions。
Google 有一个 repo a tutorial just for this for JSON files Streaming data from Cloud Storage into BigQuery using Cloud Functions。
太常见了
然后,我将修改示例 main.py
文件来自:
def streaming(data, context):
'''This function is executed whenever a file is added to Cloud Storage'''
bucket_name = data['bucket']
file_name = data['name']
db_ref = DB.document(u'streaming_files/%s' % file_name)
if _was_already_ingested(db_ref):
_handle_duplication(db_ref)
else:
try:
_insert_into_bigquery(bucket_name, file_name)
_handle_success(db_ref)
except Exception:
_handle_error(db_ref)
接受 CSV 文件:
import json
import csv
import logging
import os
import traceback
from datetime import datetime
from google.api_core import retry
from google.cloud import bigquery
from google.cloud import storage
import pytz
PROJECT_ID = os.getenv('GCP_PROJECT')
BQ_DATASET = 'fromCloudFunction'
BQ_TABLE = 'mytable'
CS = storage.Client()
BQ = bigquery.Client()
def streaming(data, context):
'''This function is executed whenever a file is added to Cloud Storage'''
bucket_name = data['bucket']
file_name = data['name']
newRows = postProcessing(bucket_name, file_name)
# It is recommended that you save
# what you process for debugging reasons.
destination_bucket = 'post-processed' # gs://post-processed/
destination_name = file_name
# saveRowsToBucket(newRows,destination_bucket,destination_name)
rowsInsertIntoBigquery(newRows)
class BigQueryError(Exception):
'''Exception raised whenever a BigQuery error happened'''
def __init__(self, errors):
super().__init__(self._format(errors))
self.errors = errors
def _format(self, errors):
err = []
for error in errors:
err.extend(error['errors'])
return json.dumps(err)
def postProcessing(bucket_name, file_name):
blob = CS.get_bucket(bucket_name).blob(file_name)
my_str = blob.download_as_string().decode('utf-8')
csv_reader = csv.DictReader(my_str.split('\n'))
newRows = []
for row in csv_reader:
modified_row = row # Add your logic
newRows.append(modified_row)
return newRows
def rowsInsertIntoBigquery(rows):
table = BQ.dataset(BQ_DATASET).table(BQ_TABLE)
errors = BQ.insert_rows_json(table,rows)
if errors != []:
raise BigQueryError(errors)
如果需要,仍然需要定义地图(row->newRow) 和函数saveRowsToBucket
。
我在 .Net Core 中编写了一个 Cloud 运行 API,它从 GCS 位置读取文件,然后进行非规范化(即为每一行添加更多信息以包含文本描述)然后将其写入 BigQuery table。我有两个选择:
- 我的云 运行 API 可以创建非规范化的 CSV 文件并将它们写入另一个 GCS 位置。然后另一个云 运行 API 可以获取那些非规范化的 CSV 文件并将它们直接写入 BigQuery。
- 我的云 运行 API 可以读取原始 CSV 文件,在内存(文件流)中对它们进行非规范化,然后以某种方式从内存中的文件流直接写入 BigQuery table。
如果性能(速度)和成本(货币)是我的目标,那么在这种情况下写入 BigQuery 的最佳方式是什么。在反规范化之前,这些文件每个大约 10KB。每行大约 1000 个字符。去规范化后,它大约是原来的三倍。在 BigQuery 中成功加载后,我不需要保留非规范化文件。我担心性能,以及 inserts/writes 左右的任何特定 BigQuery 每日配额。除非您正在执行 DML 语句,否则我认为没有,但如果我错了,请纠正我。
我会使用将文件上传到存储桶时触发的 Cloud Functions。
Google 有一个 repo a tutorial just for this for JSON files Streaming data from Cloud Storage into BigQuery using Cloud Functions。
太常见了然后,我将修改示例 main.py
文件来自:
def streaming(data, context):
'''This function is executed whenever a file is added to Cloud Storage'''
bucket_name = data['bucket']
file_name = data['name']
db_ref = DB.document(u'streaming_files/%s' % file_name)
if _was_already_ingested(db_ref):
_handle_duplication(db_ref)
else:
try:
_insert_into_bigquery(bucket_name, file_name)
_handle_success(db_ref)
except Exception:
_handle_error(db_ref)
接受 CSV 文件:
import json
import csv
import logging
import os
import traceback
from datetime import datetime
from google.api_core import retry
from google.cloud import bigquery
from google.cloud import storage
import pytz
PROJECT_ID = os.getenv('GCP_PROJECT')
BQ_DATASET = 'fromCloudFunction'
BQ_TABLE = 'mytable'
CS = storage.Client()
BQ = bigquery.Client()
def streaming(data, context):
'''This function is executed whenever a file is added to Cloud Storage'''
bucket_name = data['bucket']
file_name = data['name']
newRows = postProcessing(bucket_name, file_name)
# It is recommended that you save
# what you process for debugging reasons.
destination_bucket = 'post-processed' # gs://post-processed/
destination_name = file_name
# saveRowsToBucket(newRows,destination_bucket,destination_name)
rowsInsertIntoBigquery(newRows)
class BigQueryError(Exception):
'''Exception raised whenever a BigQuery error happened'''
def __init__(self, errors):
super().__init__(self._format(errors))
self.errors = errors
def _format(self, errors):
err = []
for error in errors:
err.extend(error['errors'])
return json.dumps(err)
def postProcessing(bucket_name, file_name):
blob = CS.get_bucket(bucket_name).blob(file_name)
my_str = blob.download_as_string().decode('utf-8')
csv_reader = csv.DictReader(my_str.split('\n'))
newRows = []
for row in csv_reader:
modified_row = row # Add your logic
newRows.append(modified_row)
return newRows
def rowsInsertIntoBigquery(rows):
table = BQ.dataset(BQ_DATASET).table(BQ_TABLE)
errors = BQ.insert_rows_json(table,rows)
if errors != []:
raise BigQueryError(errors)
如果需要,仍然需要定义地图(row->newRow) 和函数saveRowsToBucket
。