使用 Python 创建新的 table 并在 bigquery 中设置到期日期
Creating a new table and setting the expiration date in bigquery using Python
这是我的代码,它从 firebase 中提取实时数据库,将其格式化为 Json,上传到云端,然后上传到 BQ。
#standardsql
import json
import boto
import gcs_oauth2_boto_plugin
import os
import shutil
import StringIO
import tempfile
import time
import argparse
import uuid
from firebase import firebase
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import bigquery
firebase = firebase.FirebaseApplication('https://dataworks-356fa.firebaseio.com/')
result = firebase.get('/connection_info', None)
id_keys = map(str, result.keys())
with open("firetobq.json", "w") as outfile:
for id in id_keys:
json.dump(result[id], outfile, indent=None)
outfile.write("\n")
client = storage.Client(project='dataworks-356fa')
bucket = client.get_bucket('dataworks-356fa-backups')
blob = bucket.blob('firetobq.json')
with open('firetobq.json', 'rb') as f:
blob.upload_from_file(f)
dataset = 'dataworks-356fa'
source = 'gs://dataworks-356fa-backups/firetobq.json'
def load_data_from_gcs(dataset, test12, source):
bigquery_client = bigquery.Client(dataset)
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table('test12')
job_name = str(uuid.uuid4())
job1.create_disposition = 'WRITE_TRUNCATE'
job1.begin()
job= bigquery_client.load_table_from_storage(
job_name, table, "gs://dataworks-356fa-backups/firetobq.json")
job.source_format = 'NEWLINE_DELIMITED_JSON'
job.begin()
wait_for_job(job)
def wait_for_job(job):
while True:
job.reload()
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(1)
load_data_from_gcs(dataset, 'test12', source)
我如何将其更改为而不是导入 table test12 中的数据来创建新的 table 并让 table 在 1 周后过期。 (我很确定设置到期日期的命令必须以秒为单位。1 周 = 604800 秒)我知道如何通过命令行设置到期日期,但宁愿在此处自动完成。
这是我在添加 job1 后收到的错误。
Traceback (most recent call last):
File "firebasetobq2.py", line 63, in <module>
load_data_from_gcs(dataset, 'test12', source)
File "firebasetobq2.py", line 44, in load_data_from_gcs
job1.create_disposition = 'WRITE_TRUNCATE'
NameError: global name 'job1' is not defined
如果您想为 table 设置过期时间,这可能会奏效:
from datetime import datetime, timedelta
from google.cloud.bigquery.schema import SchemaField
def load_data_from_gcs(dataset,
table_name,
table_schema,
source,
source_format,
expiration_time):
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset)
table = dataset.table(table_name)
table.schema = table_schema
table.expires = expiration_time
if not table.created:
table.create()
job_name = str(uuid.uuid4())
job= bigquery_client.load_table_from_storage(
job_name, table, source)
job.source_format = source_format
job.begin()
wait_for_job(job)
dataset = 'FirebaseArchive'
table_name = 'test12'
gcs_source = 'gs://dataworks-356fa-backups/firetobq.json'
source_format = 'NEWLINE_DELIMITED_JSON'
table.schema = [SchemaField(field1), SchemaField(field2), (...)]
expiration_time = datetime.now() + timedelta(seconds=604800)
load_data_from_gcs(dataset,
table_name,
table_schema,
gcs_source,
source_format,
expiration_time)
请注意,唯一的区别是它设置的代码行:
table.expires = expiration_time
其值必须是datetime
类型(这里定义为expiration_time = datetime.now() + timedelta(seconds=604800)
)
不确定是否可以使用 Python API 使用架构自动检测,但您仍然可以使用 SchemaFields
发送此信息。例如,如果您的 table 有两个字段,user_id
和 job_id
,都是 INTEGERS
,那么架构将是:
table_schema = [SchemaField('user_id', field_type='INT64'),
SchemaField('job_id', field_type='INT64')]
有关模式在 BigQuery 中的工作原理的更多信息,请参阅 。
[编辑]:
刚刚看到你的other question,如果你想截断table然后写入数据,你可以这样做:
job.create_disposition = 'WRITE_TRUNCATE'
job.begin()
在您的 load_data_from_gcs
函数中。这将自动删除 table 并使用存储文件中的数据创建一个新的。您不必为它定义一个模式,因为它之前已经定义过(因此对您来说可能是一个更简单的解决方案)。
这是我的代码,它从 firebase 中提取实时数据库,将其格式化为 Json,上传到云端,然后上传到 BQ。
#standardsql
import json
import boto
import gcs_oauth2_boto_plugin
import os
import shutil
import StringIO
import tempfile
import time
import argparse
import uuid
from firebase import firebase
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import bigquery
firebase = firebase.FirebaseApplication('https://dataworks-356fa.firebaseio.com/')
result = firebase.get('/connection_info', None)
id_keys = map(str, result.keys())
with open("firetobq.json", "w") as outfile:
for id in id_keys:
json.dump(result[id], outfile, indent=None)
outfile.write("\n")
client = storage.Client(project='dataworks-356fa')
bucket = client.get_bucket('dataworks-356fa-backups')
blob = bucket.blob('firetobq.json')
with open('firetobq.json', 'rb') as f:
blob.upload_from_file(f)
dataset = 'dataworks-356fa'
source = 'gs://dataworks-356fa-backups/firetobq.json'
def load_data_from_gcs(dataset, test12, source):
bigquery_client = bigquery.Client(dataset)
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table('test12')
job_name = str(uuid.uuid4())
job1.create_disposition = 'WRITE_TRUNCATE'
job1.begin()
job= bigquery_client.load_table_from_storage(
job_name, table, "gs://dataworks-356fa-backups/firetobq.json")
job.source_format = 'NEWLINE_DELIMITED_JSON'
job.begin()
wait_for_job(job)
def wait_for_job(job):
while True:
job.reload()
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(1)
load_data_from_gcs(dataset, 'test12', source)
我如何将其更改为而不是导入 table test12 中的数据来创建新的 table 并让 table 在 1 周后过期。 (我很确定设置到期日期的命令必须以秒为单位。1 周 = 604800 秒)我知道如何通过命令行设置到期日期,但宁愿在此处自动完成。
这是我在添加 job1 后收到的错误。
Traceback (most recent call last):
File "firebasetobq2.py", line 63, in <module>
load_data_from_gcs(dataset, 'test12', source)
File "firebasetobq2.py", line 44, in load_data_from_gcs
job1.create_disposition = 'WRITE_TRUNCATE'
NameError: global name 'job1' is not defined
如果您想为 table 设置过期时间,这可能会奏效:
from datetime import datetime, timedelta
from google.cloud.bigquery.schema import SchemaField
def load_data_from_gcs(dataset,
table_name,
table_schema,
source,
source_format,
expiration_time):
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset)
table = dataset.table(table_name)
table.schema = table_schema
table.expires = expiration_time
if not table.created:
table.create()
job_name = str(uuid.uuid4())
job= bigquery_client.load_table_from_storage(
job_name, table, source)
job.source_format = source_format
job.begin()
wait_for_job(job)
dataset = 'FirebaseArchive'
table_name = 'test12'
gcs_source = 'gs://dataworks-356fa-backups/firetobq.json'
source_format = 'NEWLINE_DELIMITED_JSON'
table.schema = [SchemaField(field1), SchemaField(field2), (...)]
expiration_time = datetime.now() + timedelta(seconds=604800)
load_data_from_gcs(dataset,
table_name,
table_schema,
gcs_source,
source_format,
expiration_time)
请注意,唯一的区别是它设置的代码行:
table.expires = expiration_time
其值必须是datetime
类型(这里定义为expiration_time = datetime.now() + timedelta(seconds=604800)
)
不确定是否可以使用 Python API 使用架构自动检测,但您仍然可以使用 SchemaFields
发送此信息。例如,如果您的 table 有两个字段,user_id
和 job_id
,都是 INTEGERS
,那么架构将是:
table_schema = [SchemaField('user_id', field_type='INT64'),
SchemaField('job_id', field_type='INT64')]
有关模式在 BigQuery 中的工作原理的更多信息,请参阅
[编辑]:
刚刚看到你的other question,如果你想截断table然后写入数据,你可以这样做:
job.create_disposition = 'WRITE_TRUNCATE'
job.begin()
在您的 load_data_from_gcs
函数中。这将自动删除 table 并使用存储文件中的数据创建一个新的。您不必为它定义一个模式,因为它之前已经定义过(因此对您来说可能是一个更简单的解决方案)。