如何将 S3 存储桶中的选定文件转换为雪花阶段,以便使用 python 和 boto3 将数据加载到雪花中

How to convert selected files in S3 bucket into snowflake stage in order to load data into snowflake using python and boto3

我需要暂存 s3 bucket.first 中的所有文件,我找到上传到给定存储桶的最新文件,然后我需要将这些文件放入 stage.not 整个存储桶。例如,假设我有一个名为主题的存储桶。在里面我有 2 个文件夹 topic1 和 topic2。这 2 个文件夹新上传了 2 个 files.in 在这种情况下,我需要将这些新上传的文件放到舞台上,以便将这些数据加载到 snowflake.i 中,我想使用 python 和 boto3 来执行此操作。我已经构建了一个代码来查找最新文件,但我不知道如何将它们设为 stage.when 我对每个文件使用了 CREATE OR REPLACE STAGE 命令和 for 循环它只会为最后一个文件创建一个阶段。不为每个文件创建阶段。我应该怎么做?

` def download_s3_files(self):

s3_object = boto3.client('s3', aws_access_key_id=self.s3_acc_key, aws_secret_access_key=self.s3_sec_key)

    if self.source_as_stage:

        no_of_dir = []

        try:
            bucket = s3_object.list_objects(Bucket=self.s3_bucket, Prefix=self.file_path, Delimiter='/')
            print("object bucket list >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>", bucket)
        except Exception as e:
            self.propagate_log_msg('check [%s] and Source File Location Path' % e)

        for directory in bucket['CommonPrefixes']:
            no_of_dir.append(str(directory['Prefix']).rsplit('/', 2)[-2])

        print(no_of_dir)

        no_of_dir.sort(reverse=True)
        latest_dir = no_of_dir[0]

        self.convert_source_as_stage(latest_dir)

    except Exception as e:
        print(e)
        exit(-1)

def convert_source_as_stage(self, latest_file):

    source_file_format = str(self.metadata['source_file_format']).lower()+'_format' if self.metadata['source_file_format'] is not None else 'pipe_format'
    url = 's3://{bucket}/{location}/{dir_}'.format(location=self.s3_file_loc.strip("/"),
                                                   bucket=self.s3_bucket, dir_=latest_file)
    print("formateed url>>>>>>>>>>>>>>>>>>", url)
    file_name_dw = str(latest_file.rsplit('/', 1)[-1])

    print("File_Name>>>>>>>>>>>>>", file_name_dw)
    print("Source file format :", source_file_format)
    print("source url: ", url)

    self.create_stage = """

                  CREATE OR REPLACE STAGE {sa}.{table} URL='{url}'
                  CREDENTIALS=(AWS_KEY_ID='{access_key}' AWS_SECRET_KEY='{secret}')
                  FILE_FORMAT = {file};
                  // create or replace stage {sa}.{table}
                  //   file_format = (type = 'csv' field_delimiter = '|' record_delimiter = '\n');

                  """.format(sa=self.ss_cd, table=self.table.lower(), access_key=self.s3_acc_key, secret=self.s3_sec_key,
                             url=url, file=source_file_format, filename=str(self.metadata['source_table']))

    """"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
            '''CONNECT TO SNOWFLAKE''''''''''''''''''''''''''''''''''
    """""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

    print("Create Stage Statement :", self.create_stage)

    con = snowflake.connector.connect(
        user=self.USER,
        password=self.PASSWORD,
        account=self.ACCOUNT,
    )

    self.propagate_log_msg("Env metadata = [%s]" % self.env_metadata)

    """"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
    '''REFRESH DDL''''''''''''''''''''''''''''''''''
    """""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
    try:

        file_format_full_path = os.path.join(self.root, 'sql', str(source_file_format)+'.sql')
        self.create_file_format = open(file_format_full_path, 'r').read()

        self.create_schema = "CREATE schema if not exists {db_lz}.{sa}".format(sa=self.ss_cd, db_lz=self.db_lz)

        env_sql = 'USE database {db_lz}'.format(db_lz=self.db_lz)
        self.propagate_log_msg(env_sql)
        con.cursor().execute(env_sql)
        con.cursor().execute(self.create_schema)

        env_sql = 'USE schema {schema}'.format(schema=self.ss_cd)
        self.propagate_log_msg(env_sql)
        con.cursor().execute(env_sql)

        con.cursor().execute(self.create_file_format)
        con.cursor().execute(self.create_stage)

    except snowflake.connector.ProgrammingError as e:
        self.propagate_log_msg('Invalid sql, fix sql and retry')
        self.propagate_log_msg(e)
        exit()
    except KeyError:
        self.propagate_log_msg(traceback.format_exc())
        self.propagate_log_msg('deploy_ods is not set in schedule metadata, assuming it is False')
    except Exception as e:
        self.propagate_log_msg('unhandled exception, debug')
        self.propagate_log_msg(traceback.format_exc())
        exit()
    else:
        self.propagate_log_msg(
            "Successfully dropped and recreated table/stage for [{sa}.{table}]".format(sa=self.ss_cd,
                                                                                  table=self.table))`

也许你可以退后一步,更全面地了解你正在努力实现的目标。这将有助于其他人提供好的建议。

最佳做法是为整个存储桶创建一个 雪花STAGESTAGE 对象然后镜像存储桶对象。如果您的设置需要,例如。桶的不同部分有不同的权限,那么创建具有不同访问权限的多个阶段就有意义了。

看起来设置阶段的目的是将 S3 对象导入 Snowflake 表。这是通过 COPY INTO <table> 命令完成的,该命令有两个选项用于选择 objects/filenames 导入:

  1. FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] )
  2. PATTERN = '<regex_pattern>'

我建议您将精力放在 COPY INTO <table> 参数上,而不是在数据库中创建过多的 STAGE 对象。

您还应该认真研究 Snowpipes。 Snowpipes 使用 S3 触发的 COPY INTO <table> 命令近乎实时地将 S3 对象导入 Snowflake 表中,例如。创建对象事件。雪管的成本低于仓库,因为它们不是专用资源。
简单有效。