如何将 S3 存储桶中的选定文件转换为雪花阶段,以便使用 python 和 boto3 将数据加载到雪花中
How to convert selected files in S3 bucket into snowflake stage in order to load data into snowflake using python and boto3
我需要暂存 s3 bucket.first 中的所有文件,我找到上传到给定存储桶的最新文件,然后我需要将这些文件放入 stage.not 整个存储桶。例如,假设我有一个名为主题的存储桶。在里面我有 2 个文件夹 topic1 和 topic2。这 2 个文件夹新上传了 2 个 files.in 在这种情况下,我需要将这些新上传的文件放到舞台上,以便将这些数据加载到 snowflake.i 中,我想使用 python 和 boto3 来执行此操作。我已经构建了一个代码来查找最新文件,但我不知道如何将它们设为 stage.when 我对每个文件使用了 CREATE OR REPLACE STAGE 命令和 for 循环它只会为最后一个文件创建一个阶段。不为每个文件创建阶段。我应该怎么做?
` def download_s3_files(self):
s3_object = boto3.client('s3', aws_access_key_id=self.s3_acc_key, aws_secret_access_key=self.s3_sec_key)
if self.source_as_stage:
no_of_dir = []
try:
bucket = s3_object.list_objects(Bucket=self.s3_bucket, Prefix=self.file_path, Delimiter='/')
print("object bucket list >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>", bucket)
except Exception as e:
self.propagate_log_msg('check [%s] and Source File Location Path' % e)
for directory in bucket['CommonPrefixes']:
no_of_dir.append(str(directory['Prefix']).rsplit('/', 2)[-2])
print(no_of_dir)
no_of_dir.sort(reverse=True)
latest_dir = no_of_dir[0]
self.convert_source_as_stage(latest_dir)
except Exception as e:
print(e)
exit(-1)
def convert_source_as_stage(self, latest_file):
source_file_format = str(self.metadata['source_file_format']).lower()+'_format' if self.metadata['source_file_format'] is not None else 'pipe_format'
url = 's3://{bucket}/{location}/{dir_}'.format(location=self.s3_file_loc.strip("/"),
bucket=self.s3_bucket, dir_=latest_file)
print("formateed url>>>>>>>>>>>>>>>>>>", url)
file_name_dw = str(latest_file.rsplit('/', 1)[-1])
print("File_Name>>>>>>>>>>>>>", file_name_dw)
print("Source file format :", source_file_format)
print("source url: ", url)
self.create_stage = """
CREATE OR REPLACE STAGE {sa}.{table} URL='{url}'
CREDENTIALS=(AWS_KEY_ID='{access_key}' AWS_SECRET_KEY='{secret}')
FILE_FORMAT = {file};
// create or replace stage {sa}.{table}
// file_format = (type = 'csv' field_delimiter = '|' record_delimiter = '\n');
""".format(sa=self.ss_cd, table=self.table.lower(), access_key=self.s3_acc_key, secret=self.s3_sec_key,
url=url, file=source_file_format, filename=str(self.metadata['source_table']))
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
'''CONNECT TO SNOWFLAKE''''''''''''''''''''''''''''''''''
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
print("Create Stage Statement :", self.create_stage)
con = snowflake.connector.connect(
user=self.USER,
password=self.PASSWORD,
account=self.ACCOUNT,
)
self.propagate_log_msg("Env metadata = [%s]" % self.env_metadata)
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
'''REFRESH DDL''''''''''''''''''''''''''''''''''
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
try:
file_format_full_path = os.path.join(self.root, 'sql', str(source_file_format)+'.sql')
self.create_file_format = open(file_format_full_path, 'r').read()
self.create_schema = "CREATE schema if not exists {db_lz}.{sa}".format(sa=self.ss_cd, db_lz=self.db_lz)
env_sql = 'USE database {db_lz}'.format(db_lz=self.db_lz)
self.propagate_log_msg(env_sql)
con.cursor().execute(env_sql)
con.cursor().execute(self.create_schema)
env_sql = 'USE schema {schema}'.format(schema=self.ss_cd)
self.propagate_log_msg(env_sql)
con.cursor().execute(env_sql)
con.cursor().execute(self.create_file_format)
con.cursor().execute(self.create_stage)
except snowflake.connector.ProgrammingError as e:
self.propagate_log_msg('Invalid sql, fix sql and retry')
self.propagate_log_msg(e)
exit()
except KeyError:
self.propagate_log_msg(traceback.format_exc())
self.propagate_log_msg('deploy_ods is not set in schedule metadata, assuming it is False')
except Exception as e:
self.propagate_log_msg('unhandled exception, debug')
self.propagate_log_msg(traceback.format_exc())
exit()
else:
self.propagate_log_msg(
"Successfully dropped and recreated table/stage for [{sa}.{table}]".format(sa=self.ss_cd,
table=self.table))`
也许你可以退后一步,更全面地了解你正在努力实现的目标。这将有助于其他人提供好的建议。
最佳做法是为整个存储桶创建一个 雪花STAGE
。 STAGE
对象然后镜像存储桶对象。如果您的设置需要,例如。桶的不同部分有不同的权限,那么创建具有不同访问权限的多个阶段就有意义了。
看起来设置阶段的目的是将 S3 对象导入 Snowflake 表。这是通过 COPY INTO <table>
命令完成的,该命令有两个选项用于选择 objects/filenames 导入:
FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] )
PATTERN = '<regex_pattern>'
我建议您将精力放在 COPY INTO <table>
参数上,而不是在数据库中创建过多的 STAGE
对象。
您还应该认真研究 Snowpipes。 Snowpipes 使用 S3 触发的 COPY INTO <table>
命令近乎实时地将 S3 对象导入 Snowflake 表中,例如。创建对象事件。雪管的成本低于仓库,因为它们不是专用资源。
简单有效。
我需要暂存 s3 bucket.first 中的所有文件,我找到上传到给定存储桶的最新文件,然后我需要将这些文件放入 stage.not 整个存储桶。例如,假设我有一个名为主题的存储桶。在里面我有 2 个文件夹 topic1 和 topic2。这 2 个文件夹新上传了 2 个 files.in 在这种情况下,我需要将这些新上传的文件放到舞台上,以便将这些数据加载到 snowflake.i 中,我想使用 python 和 boto3 来执行此操作。我已经构建了一个代码来查找最新文件,但我不知道如何将它们设为 stage.when 我对每个文件使用了 CREATE OR REPLACE STAGE 命令和 for 循环它只会为最后一个文件创建一个阶段。不为每个文件创建阶段。我应该怎么做?
` def download_s3_files(self):
s3_object = boto3.client('s3', aws_access_key_id=self.s3_acc_key, aws_secret_access_key=self.s3_sec_key)
if self.source_as_stage:
no_of_dir = []
try:
bucket = s3_object.list_objects(Bucket=self.s3_bucket, Prefix=self.file_path, Delimiter='/')
print("object bucket list >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>", bucket)
except Exception as e:
self.propagate_log_msg('check [%s] and Source File Location Path' % e)
for directory in bucket['CommonPrefixes']:
no_of_dir.append(str(directory['Prefix']).rsplit('/', 2)[-2])
print(no_of_dir)
no_of_dir.sort(reverse=True)
latest_dir = no_of_dir[0]
self.convert_source_as_stage(latest_dir)
except Exception as e:
print(e)
exit(-1)
def convert_source_as_stage(self, latest_file):
source_file_format = str(self.metadata['source_file_format']).lower()+'_format' if self.metadata['source_file_format'] is not None else 'pipe_format'
url = 's3://{bucket}/{location}/{dir_}'.format(location=self.s3_file_loc.strip("/"),
bucket=self.s3_bucket, dir_=latest_file)
print("formateed url>>>>>>>>>>>>>>>>>>", url)
file_name_dw = str(latest_file.rsplit('/', 1)[-1])
print("File_Name>>>>>>>>>>>>>", file_name_dw)
print("Source file format :", source_file_format)
print("source url: ", url)
self.create_stage = """
CREATE OR REPLACE STAGE {sa}.{table} URL='{url}'
CREDENTIALS=(AWS_KEY_ID='{access_key}' AWS_SECRET_KEY='{secret}')
FILE_FORMAT = {file};
// create or replace stage {sa}.{table}
// file_format = (type = 'csv' field_delimiter = '|' record_delimiter = '\n');
""".format(sa=self.ss_cd, table=self.table.lower(), access_key=self.s3_acc_key, secret=self.s3_sec_key,
url=url, file=source_file_format, filename=str(self.metadata['source_table']))
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
'''CONNECT TO SNOWFLAKE''''''''''''''''''''''''''''''''''
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
print("Create Stage Statement :", self.create_stage)
con = snowflake.connector.connect(
user=self.USER,
password=self.PASSWORD,
account=self.ACCOUNT,
)
self.propagate_log_msg("Env metadata = [%s]" % self.env_metadata)
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
'''REFRESH DDL''''''''''''''''''''''''''''''''''
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
try:
file_format_full_path = os.path.join(self.root, 'sql', str(source_file_format)+'.sql')
self.create_file_format = open(file_format_full_path, 'r').read()
self.create_schema = "CREATE schema if not exists {db_lz}.{sa}".format(sa=self.ss_cd, db_lz=self.db_lz)
env_sql = 'USE database {db_lz}'.format(db_lz=self.db_lz)
self.propagate_log_msg(env_sql)
con.cursor().execute(env_sql)
con.cursor().execute(self.create_schema)
env_sql = 'USE schema {schema}'.format(schema=self.ss_cd)
self.propagate_log_msg(env_sql)
con.cursor().execute(env_sql)
con.cursor().execute(self.create_file_format)
con.cursor().execute(self.create_stage)
except snowflake.connector.ProgrammingError as e:
self.propagate_log_msg('Invalid sql, fix sql and retry')
self.propagate_log_msg(e)
exit()
except KeyError:
self.propagate_log_msg(traceback.format_exc())
self.propagate_log_msg('deploy_ods is not set in schedule metadata, assuming it is False')
except Exception as e:
self.propagate_log_msg('unhandled exception, debug')
self.propagate_log_msg(traceback.format_exc())
exit()
else:
self.propagate_log_msg(
"Successfully dropped and recreated table/stage for [{sa}.{table}]".format(sa=self.ss_cd,
table=self.table))`
也许你可以退后一步,更全面地了解你正在努力实现的目标。这将有助于其他人提供好的建议。
最佳做法是为整个存储桶创建一个 雪花STAGE
。 STAGE
对象然后镜像存储桶对象。如果您的设置需要,例如。桶的不同部分有不同的权限,那么创建具有不同访问权限的多个阶段就有意义了。
看起来设置阶段的目的是将 S3 对象导入 Snowflake 表。这是通过 COPY INTO <table>
命令完成的,该命令有两个选项用于选择 objects/filenames 导入:
FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] )
PATTERN = '<regex_pattern>'
我建议您将精力放在 COPY INTO <table>
参数上,而不是在数据库中创建过多的 STAGE
对象。
您还应该认真研究 Snowpipes。 Snowpipes 使用 S3 触发的 COPY INTO <table>
命令近乎实时地将 S3 对象导入 Snowflake 表中,例如。创建对象事件。雪管的成本低于仓库,因为它们不是专用资源。
简单有效。