在事件触发时将多个 csv 加载到 BQ table
Loading multiple csv into BQ table on event trigger
我正在尝试通过以下代码将多个 csv 文件加载到 table,但它失败了:
任何人都可以让我知道我哪里错了:
##################
def csv_loader(data, context):
client = bigquery.Client()
dataset_id = os.environ['DATASET']
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
field_delimiter="|",
write_disposition="WRITE_TRUNCATE",
skip_leading_rows=1,
)
# get the URI for uploaded CSV in GCS from 'data'
uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
# lets do this
load_job = client.load_table_from_uri(
uri,
dataset_ref.table(os.environ['TABLE'])
load_job.result() # wait for table load to complete.
print('Job finished.')
destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
print('Loaded {} rows.'.format(destination_table.num_rows))
###################
如果为一个文件提到了架构,那么上面的工作正常,但是对于多个文件给出 error.Can 谁能告诉我我做错了什么
根据您最后的评论,架构不同,您想要使用架构自动检测。但是,我没有在您的代码中看到该标志,也没有看到您在作业加载方法中传递了 job_config 变量。
尝试以下操作:
注意:我在 job_config
变量中添加了标志 autodetect=True
,并且还在 load_table_from_uri()[ 中传递了 job_config
变量函数。
def csv_loader(data, context):
client = bigquery.Client()
dataset_id = os.environ['DATASET']
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.CSV,
field_delimiter="|",
write_disposition="WRITE_TRUNCATE",
skip_leading_rows=1,
)
# get the URI for uploaded CSV in GCS from 'data'
uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
# lets do this
load_job = client.load_table_from_uri(
uri,
dataset_ref.table(os.environ['TABLE'],
job_config=job_config
)
load_job.result() # wait for table load to complete.
print('Job finished.')
destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
print('Loaded {} rows.'.format(destination_table.num_rows))
我正在尝试通过以下代码将多个 csv 文件加载到 table,但它失败了: 任何人都可以让我知道我哪里错了: ##################
def csv_loader(data, context):
client = bigquery.Client()
dataset_id = os.environ['DATASET']
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
field_delimiter="|",
write_disposition="WRITE_TRUNCATE",
skip_leading_rows=1,
)
# get the URI for uploaded CSV in GCS from 'data'
uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
# lets do this
load_job = client.load_table_from_uri(
uri,
dataset_ref.table(os.environ['TABLE'])
load_job.result() # wait for table load to complete.
print('Job finished.')
destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
print('Loaded {} rows.'.format(destination_table.num_rows))
###################
如果为一个文件提到了架构,那么上面的工作正常,但是对于多个文件给出 error.Can 谁能告诉我我做错了什么
根据您最后的评论,架构不同,您想要使用架构自动检测。但是,我没有在您的代码中看到该标志,也没有看到您在作业加载方法中传递了 job_config 变量。
尝试以下操作:
注意:我在 job_config
变量中添加了标志 autodetect=True
,并且还在 load_table_from_uri()[ 中传递了 job_config
变量函数。
def csv_loader(data, context):
client = bigquery.Client()
dataset_id = os.environ['DATASET']
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.CSV,
field_delimiter="|",
write_disposition="WRITE_TRUNCATE",
skip_leading_rows=1,
)
# get the URI for uploaded CSV in GCS from 'data'
uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
# lets do this
load_job = client.load_table_from_uri(
uri,
dataset_ref.table(os.environ['TABLE'],
job_config=job_config
)
load_job.result() # wait for table load to complete.
print('Job finished.')
destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
print('Loaded {} rows.'.format(destination_table.num_rows))