将 Dataframe 保存到 csv 直接到 s3 Python
Save Dataframe to csv directly to s3 Python
我有一个 pandas DataFrame,我想将其上传到新的 CSV 文件。问题是我不想在将文件传输到 s3 之前将其保存在本地。有没有像 to_csv 这样的方法可以直接将数据框写入 s3?我正在使用 boto3。
这是我目前所拥有的:
import boto3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
read_file = s3.get_object(Bucket, Key)
df = pd.read_csv(read_file['Body'])
# Make alterations to DataFrame
# Then export DataFrame to CSV through direct transfer to s3
如果您将 None
作为第一个参数传递给 to_csv()
,数据将作为字符串返回。从那里可以轻松地将其一次性上传到 S3。
也应该可以将 StringIO
对象传递给 to_csv()
,但使用字符串会更容易。
您可以使用:
from io import StringIO # python3; python2: BytesIO
import boto3
bucket = 'my_bucket_name' # already created on S3
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())
我喜欢 s3fs,它让您(几乎)像使用本地文件系统一样使用 s3。
你可以这样做:
import s3fs
bytes_to_write = df.to_csv(None).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open('s3://bucket/path/to/file.csv', 'wb') as f:
f.write(bytes_to_write)
s3fs
仅支持 rb
和 wb
模式打开文件,这就是为什么我这样做 bytes_to_write
的原因。
我从存储桶 s3 中读取了一个包含两列的 csv,以及我放入 pandas 数据帧中的文件 csv 的内容。
示例:
config.json
{
"credential": {
"access_key":"xxxxxx",
"secret_key":"xxxxxx"
}
,
"s3":{
"bucket":"mybucket",
"key":"csv/user.csv"
}
}
cls_config.json
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import json
class cls_config(object):
def __init__(self,filename):
self.filename = filename
def getConfig(self):
fileName = os.path.join(os.path.dirname(__file__), self.filename)
with open(fileName) as f:
config = json.load(f)
return config
cls_pandas.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pandas as pd
import io
class cls_pandas(object):
def __init__(self):
pass
def read(self,stream):
df = pd.read_csv(io.StringIO(stream), sep = ",")
return df
cls_s3.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import boto3
import json
class cls_s3(object):
def __init__(self,access_key,secret_key):
self.s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
def getObject(self,bucket,key):
read_file = self.s3.get_object(Bucket=bucket, Key=key)
body = read_file['Body'].read().decode('utf-8')
return body
test.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from cls_config import *
from cls_s3 import *
from cls_pandas import *
class test(object):
def __init__(self):
self.conf = cls_config('config.json')
def process(self):
conf = self.conf.getConfig()
bucket = conf['s3']['bucket']
key = conf['s3']['key']
access_key = conf['credential']['access_key']
secret_key = conf['credential']['secret_key']
s3 = cls_s3(access_key,secret_key)
ob = s3.getObject(bucket,key)
pa = cls_pandas()
df = pa.read(ob)
print df
if __name__ == '__main__':
test = test()
test.process()
这是一个更新的答案:
import s3fs
s3 = s3fs.S3FileSystem(anon=False)
# Use 'w' for py3, 'wb' for py2
with s3.open('<bucket-name>/<filename>.csv','w') as f:
df.to_csv(f)
StringIO 的问题是它会侵蚀你的记忆。使用此方法,您将文件流式传输到 s3,而不是将其转换为字符串,然后将其写入 s3。在内存中保存 pandas 数据帧及其字符串副本似乎效率很低。
如果你在 ec2 instant 中工作,你可以给它一个 IAM 角色来启用它写入 s3,这样你就不需要直接传递凭证。但是,您也可以通过将凭据传递给 S3FileSystem()
函数来连接到存储桶。请参阅文档:https://s3fs.readthedocs.io/en/latest/
可以直接使用S3路径。我正在使用 Pandas 0.24.1
In [1]: import pandas as pd
In [2]: df = pd.DataFrame( [ [1, 1, 1], [2, 2, 2] ], columns=['a', 'b', 'c'])
In [3]: df
Out[3]:
a b c
0 1 1 1
1 2 2 2
In [4]: df.to_csv('s3://experimental/playground/temp_csv/dummy.csv', index=False)
In [5]: pd.__version__
Out[5]: '0.24.1'
In [6]: new_df = pd.read_csv('s3://experimental/playground/temp_csv/dummy.csv')
In [7]: new_df
Out[7]:
a b c
0 1 1 1
1 2 2 2
S3 File Handling
pandas now uses s3fs for handling S3 connections. This shouldn’t break any code. However, since s3fs is not a required dependency, you will need to install it separately, like boto in prior versions of pandas. GH11915.
由于您正在使用 boto3.client()
,请尝试:
import boto3
from io import StringIO #python3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
def copy_to_s3(client, df, bucket, filepath):
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')
copy_to_s3(client=s3, df=df_to_upload, bucket='abc', filepath='def/test.csv')
您还可以使用 AWS Data Wrangler:
import awswrangler as wr
wr.s3.to_csv(
df=df,
path="s3://...",
)
请注意,它将为您处理分段上传,使上传速度更快。
我发现这也可以使用 client
来完成,而不仅仅是 resource
。
from io import StringIO
import boto3
s3 = boto3.client("s3",\
region_name=region_name,\
aws_access_key_id=aws_access_key_id,\
aws_secret_access_key=aws_secret_access_key)
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key='path/test.csv')
我找到了一个似乎有效的非常简单的解决方案:
s3 = boto3.client("s3")
s3.put_object(
Body=open("filename.csv").read(),
Bucket="your-bucket",
Key="your-key"
)
希望对您有所帮助!
我用AWS Data Wrangler。例如:
import awswrangler as wr
import pandas as pd
# read a local dataframe
df = pd.read_parquet('my_local_file.gz')
# upload to S3 bucket
wr.s3.to_parquet(df=df, path='s3://mys3bucket/file_name.gz')
这同样适用于 csv 文件。使用具有适当文件扩展名的 read_csv
和 to_csv
,而不是 read_parquet
和 to_parquet
。
您可以使用
- pandas
- boto3
- s3fs(版本≤0.4)
我在路径中使用 to_csv
和 s3://
以及 storage_options
key = "folder/file.csv"
df.to_csv(
f"s3://{YOUR_S3_BUCKET}/{key}",
index=False,
storage_options={
"key": AWS_ACCESS_KEY_ID,
"secret": AWS_SECRET_ACCESS_KEY,
"token": AWS_SESSION_TOKEN,
},
要有效处理大文件,您还可以使用 open-source S3 兼容的 MinIO,它具有 minio
python client package,就像我的这个函数:
import minio
import os
import pandas as pd
minio_client = minio.Minio(..)
def write_df_to_minio(df,
minio_client,
bucket_name,
file_name="new-file.csv",
local_temp_folder="/tmp/",
content_type="application/csv",
sep=",",
save_row_index=False):
df.to_csv(os.path.join(local_temp_folder, file_name), sep=sep, index=save_row_index)
minio_results = minio_client.fput_object(bucket_name=bucket_name,
object_name=file_name,
file_path=os.path.join(local_temp_folder, file_name),
content_type=content_type)
assert minio_results.object_name == file_name
另一种选择是使用 cloudpathlib 执行此操作,它支持 S3 以及 Google 云存储和 Azure Blob 存储。请参阅下面的示例。
import pandas as pd
from cloudpathlib import CloudPath
# read data from S3
df = pd.read_csv(CloudPath("s3://covid19-lake/rearc-covid-19-testing-data/csv/states_daily/states_daily.csv"))
# look at some of the data
df.head(1).T.iloc[:10]
#> 0
#> date 20210307
#> state AK
#> positive 56886.0
#> probableCases NaN
#> negative NaN
#> pending NaN
#> totalTestResultsSource totalTestsViral
#> totalTestResults 1731628.0
#> hospitalizedCurrently 33.0
#> hospitalizedCumulative 1293.0
# writing to S3
with CloudPath("s3://bucket-you-can-write-to/data.csv").open("w") as f:
df.to_csv(f)
CloudPath("s3://bucket-you-can-write-to/data.csv").exists()
#> True
请注意,由于 pandas 处理传递给它的 paths/handles 的方式,您不能直接调用 df.to_csv(CloudPath("s3://drivendata-public-assets/test-asdf2.csv"))
。相反,您需要打开文件进行写入并将该句柄直接传递给 to_csv
.
这在设置方面带来了一些额外的好处 particular options or different authentication mechanisms or keeping a persistent cache 因此您不必总是从 S3 重新下载。
我有一个 pandas DataFrame,我想将其上传到新的 CSV 文件。问题是我不想在将文件传输到 s3 之前将其保存在本地。有没有像 to_csv 这样的方法可以直接将数据框写入 s3?我正在使用 boto3。
这是我目前所拥有的:
import boto3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
read_file = s3.get_object(Bucket, Key)
df = pd.read_csv(read_file['Body'])
# Make alterations to DataFrame
# Then export DataFrame to CSV through direct transfer to s3
如果您将 None
作为第一个参数传递给 to_csv()
,数据将作为字符串返回。从那里可以轻松地将其一次性上传到 S3。
也应该可以将 StringIO
对象传递给 to_csv()
,但使用字符串会更容易。
您可以使用:
from io import StringIO # python3; python2: BytesIO
import boto3
bucket = 'my_bucket_name' # already created on S3
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())
我喜欢 s3fs,它让您(几乎)像使用本地文件系统一样使用 s3。
你可以这样做:
import s3fs
bytes_to_write = df.to_csv(None).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open('s3://bucket/path/to/file.csv', 'wb') as f:
f.write(bytes_to_write)
s3fs
仅支持 rb
和 wb
模式打开文件,这就是为什么我这样做 bytes_to_write
的原因。
我从存储桶 s3 中读取了一个包含两列的 csv,以及我放入 pandas 数据帧中的文件 csv 的内容。
示例:
config.json
{
"credential": {
"access_key":"xxxxxx",
"secret_key":"xxxxxx"
}
,
"s3":{
"bucket":"mybucket",
"key":"csv/user.csv"
}
}
cls_config.json
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import json
class cls_config(object):
def __init__(self,filename):
self.filename = filename
def getConfig(self):
fileName = os.path.join(os.path.dirname(__file__), self.filename)
with open(fileName) as f:
config = json.load(f)
return config
cls_pandas.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pandas as pd
import io
class cls_pandas(object):
def __init__(self):
pass
def read(self,stream):
df = pd.read_csv(io.StringIO(stream), sep = ",")
return df
cls_s3.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import boto3
import json
class cls_s3(object):
def __init__(self,access_key,secret_key):
self.s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
def getObject(self,bucket,key):
read_file = self.s3.get_object(Bucket=bucket, Key=key)
body = read_file['Body'].read().decode('utf-8')
return body
test.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from cls_config import *
from cls_s3 import *
from cls_pandas import *
class test(object):
def __init__(self):
self.conf = cls_config('config.json')
def process(self):
conf = self.conf.getConfig()
bucket = conf['s3']['bucket']
key = conf['s3']['key']
access_key = conf['credential']['access_key']
secret_key = conf['credential']['secret_key']
s3 = cls_s3(access_key,secret_key)
ob = s3.getObject(bucket,key)
pa = cls_pandas()
df = pa.read(ob)
print df
if __name__ == '__main__':
test = test()
test.process()
这是一个更新的答案:
import s3fs
s3 = s3fs.S3FileSystem(anon=False)
# Use 'w' for py3, 'wb' for py2
with s3.open('<bucket-name>/<filename>.csv','w') as f:
df.to_csv(f)
StringIO 的问题是它会侵蚀你的记忆。使用此方法,您将文件流式传输到 s3,而不是将其转换为字符串,然后将其写入 s3。在内存中保存 pandas 数据帧及其字符串副本似乎效率很低。
如果你在 ec2 instant 中工作,你可以给它一个 IAM 角色来启用它写入 s3,这样你就不需要直接传递凭证。但是,您也可以通过将凭据传递给 S3FileSystem()
函数来连接到存储桶。请参阅文档:https://s3fs.readthedocs.io/en/latest/
可以直接使用S3路径。我正在使用 Pandas 0.24.1
In [1]: import pandas as pd
In [2]: df = pd.DataFrame( [ [1, 1, 1], [2, 2, 2] ], columns=['a', 'b', 'c'])
In [3]: df
Out[3]:
a b c
0 1 1 1
1 2 2 2
In [4]: df.to_csv('s3://experimental/playground/temp_csv/dummy.csv', index=False)
In [5]: pd.__version__
Out[5]: '0.24.1'
In [6]: new_df = pd.read_csv('s3://experimental/playground/temp_csv/dummy.csv')
In [7]: new_df
Out[7]:
a b c
0 1 1 1
1 2 2 2
S3 File Handling
pandas now uses s3fs for handling S3 connections. This shouldn’t break any code. However, since s3fs is not a required dependency, you will need to install it separately, like boto in prior versions of pandas. GH11915.
由于您正在使用 boto3.client()
,请尝试:
import boto3
from io import StringIO #python3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
def copy_to_s3(client, df, bucket, filepath):
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')
copy_to_s3(client=s3, df=df_to_upload, bucket='abc', filepath='def/test.csv')
您还可以使用 AWS Data Wrangler:
import awswrangler as wr
wr.s3.to_csv(
df=df,
path="s3://...",
)
请注意,它将为您处理分段上传,使上传速度更快。
我发现这也可以使用 client
来完成,而不仅仅是 resource
。
from io import StringIO
import boto3
s3 = boto3.client("s3",\
region_name=region_name,\
aws_access_key_id=aws_access_key_id,\
aws_secret_access_key=aws_secret_access_key)
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key='path/test.csv')
我找到了一个似乎有效的非常简单的解决方案:
s3 = boto3.client("s3")
s3.put_object(
Body=open("filename.csv").read(),
Bucket="your-bucket",
Key="your-key"
)
希望对您有所帮助!
我用AWS Data Wrangler。例如:
import awswrangler as wr
import pandas as pd
# read a local dataframe
df = pd.read_parquet('my_local_file.gz')
# upload to S3 bucket
wr.s3.to_parquet(df=df, path='s3://mys3bucket/file_name.gz')
这同样适用于 csv 文件。使用具有适当文件扩展名的 read_csv
和 to_csv
,而不是 read_parquet
和 to_parquet
。
您可以使用
- pandas
- boto3
- s3fs(版本≤0.4)
我在路径中使用 to_csv
和 s3://
以及 storage_options
key = "folder/file.csv"
df.to_csv(
f"s3://{YOUR_S3_BUCKET}/{key}",
index=False,
storage_options={
"key": AWS_ACCESS_KEY_ID,
"secret": AWS_SECRET_ACCESS_KEY,
"token": AWS_SESSION_TOKEN,
},
要有效处理大文件,您还可以使用 open-source S3 兼容的 MinIO,它具有 minio
python client package,就像我的这个函数:
import minio
import os
import pandas as pd
minio_client = minio.Minio(..)
def write_df_to_minio(df,
minio_client,
bucket_name,
file_name="new-file.csv",
local_temp_folder="/tmp/",
content_type="application/csv",
sep=",",
save_row_index=False):
df.to_csv(os.path.join(local_temp_folder, file_name), sep=sep, index=save_row_index)
minio_results = minio_client.fput_object(bucket_name=bucket_name,
object_name=file_name,
file_path=os.path.join(local_temp_folder, file_name),
content_type=content_type)
assert minio_results.object_name == file_name
另一种选择是使用 cloudpathlib 执行此操作,它支持 S3 以及 Google 云存储和 Azure Blob 存储。请参阅下面的示例。
import pandas as pd
from cloudpathlib import CloudPath
# read data from S3
df = pd.read_csv(CloudPath("s3://covid19-lake/rearc-covid-19-testing-data/csv/states_daily/states_daily.csv"))
# look at some of the data
df.head(1).T.iloc[:10]
#> 0
#> date 20210307
#> state AK
#> positive 56886.0
#> probableCases NaN
#> negative NaN
#> pending NaN
#> totalTestResultsSource totalTestsViral
#> totalTestResults 1731628.0
#> hospitalizedCurrently 33.0
#> hospitalizedCumulative 1293.0
# writing to S3
with CloudPath("s3://bucket-you-can-write-to/data.csv").open("w") as f:
df.to_csv(f)
CloudPath("s3://bucket-you-can-write-to/data.csv").exists()
#> True
请注意,由于 pandas 处理传递给它的 paths/handles 的方式,您不能直接调用 df.to_csv(CloudPath("s3://drivendata-public-assets/test-asdf2.csv"))
。相反,您需要打开文件进行写入并将该句柄直接传递给 to_csv
.
这在设置方面带来了一些额外的好处 particular options or different authentication mechanisms or keeping a persistent cache 因此您不必总是从 S3 重新下载。