将 pandas 数据框转换为 parquet 格式并上传到 s3 存储桶
Convert pandas dataframe to parquet format and upload to s3 bucket
我有一个 parquet 文件列表,我需要将它们从一个 S3 存储桶复制到另一个帐户中的另一个 s3 存储桶。在我上传之前,我必须在镶木地板文件中添加几列。
我正在尝试将文件读取到 pandas 数据框,我正在添加列并将其转换为镶木地板,但它似乎不起作用。
这就是我正在尝试的。
my_parquet_list 是我获取所有键列表的地方。
for file in my_parquet_list:
bucket = 'source_bucket_name'
buffer = io.BytesIO()
s3 = session.resource('s3')
s3_obj = s3.Object(bucket,file)
s3_obj.download_fileobj(buffer)
df = pd.read_parquet(buffer)
df["col_new"] = 'xyz'
df["date"] = datetime.datetime.utcnow()
df.to_parquet(buffer, engine= 'pyarrow', index = False)
bucketdest = 'dest_bucket_name'
s3_file = 's3_folder_path/'+'.parquet'
print(s3_file)
s3.Object(bucketdest, s3_file).put(Body=buffer.getvalue())
print('loaded')
只需pip install s3fs
,然后配置你的aws CLI,最后你就可以使用df.to_parquet('s3://bucket_name/output-dir/df.parquet.gzip',index=False)
我有一个 parquet 文件列表,我需要将它们从一个 S3 存储桶复制到另一个帐户中的另一个 s3 存储桶。在我上传之前,我必须在镶木地板文件中添加几列。 我正在尝试将文件读取到 pandas 数据框,我正在添加列并将其转换为镶木地板,但它似乎不起作用。
这就是我正在尝试的。 my_parquet_list 是我获取所有键列表的地方。
for file in my_parquet_list:
bucket = 'source_bucket_name'
buffer = io.BytesIO()
s3 = session.resource('s3')
s3_obj = s3.Object(bucket,file)
s3_obj.download_fileobj(buffer)
df = pd.read_parquet(buffer)
df["col_new"] = 'xyz'
df["date"] = datetime.datetime.utcnow()
df.to_parquet(buffer, engine= 'pyarrow', index = False)
bucketdest = 'dest_bucket_name'
s3_file = 's3_folder_path/'+'.parquet'
print(s3_file)
s3.Object(bucketdest, s3_file).put(Body=buffer.getvalue())
print('loaded')
只需pip install s3fs
,然后配置你的aws CLI,最后你就可以使用df.to_parquet('s3://bucket_name/output-dir/df.parquet.gzip',index=False)