在事件触发时将多个 csv 加载到 BQ table

Loading multiple csv into BQ table on event trigger

我正在尝试通过以下代码将多个 csv 文件加载到 table,但它失败了: 任何人都可以让我知道我哪里错了: ##################

def csv_loader(data, context):
        client = bigquery.Client()
        dataset_id = os.environ['DATASET']
        dataset_ref = client.dataset(dataset_id)
        job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        field_delimiter="|",
        write_disposition="WRITE_TRUNCATE",
        skip_leading_rows=1,
         )

        # get the URI for uploaded CSV in GCS from 'data'
        uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
        # lets do this
        load_job = client.load_table_from_uri(
                uri,
                dataset_ref.table(os.environ['TABLE'])

        load_job.result()  # wait for table load to complete.
        print('Job finished.')
        destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
        print('Loaded {} rows.'.format(destination_table.num_rows))

###################

如果为一个文件提到了架构,那么上面的工作正常,但是对于多个文件给出 error.Can 谁能告诉我我做错了什么

根据您最后的评论,架构不同,您想要使用架构自动检测。但是,我没有在您的代码中看到该标志,也没有看到您在作业加载方法中传递了 job_config 变量。

尝试以下操作:

注意:我在 job_config 变量中添加了标志 autodetect=True,并且还在 load_table_from_uri()[ 中传递了 job_config 变量函数。

def csv_loader(data, context):
        client = bigquery.Client()
        dataset_id = os.environ['DATASET']
        dataset_ref = client.dataset(dataset_id)
        job_config = bigquery.LoadJobConfig(
        autodetect=True,
        source_format=bigquery.SourceFormat.CSV,
        field_delimiter="|",
        write_disposition="WRITE_TRUNCATE",
        skip_leading_rows=1,
         )

        # get the URI for uploaded CSV in GCS from 'data'
        uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
        # lets do this
        load_job = client.load_table_from_uri(
                uri,
                dataset_ref.table(os.environ['TABLE'],
                job_config=job_config
               )

        load_job.result()  # wait for table load to complete.
        print('Job finished.')
        destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
        print('Loaded {} rows.'.format(destination_table.num_rows))