写入 Pandas Dataframe to_csv StringIO 而不是文件
Write Pandas Dataframe to_csv StringIO instead of file
Objective 这段代码是从指定的 S3 存储桶中读取现有的 CSV 文件到数据帧中,过滤所需列的数据帧,然后写入 filtered 使用 StringIO 将数据帧转换为 CSV 对象,我可以将其上传到不同的 S3 存储桶。
现在一切正常除了函数的代码块"prepare_file_for_upload"。以下是完整的代码块:
from io import StringIO
import io #unsued at the moment
import logging
import pandas as pd
import boto3
from botocore.exceptions import ClientError
FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)
#S3 parameters
source_bucket = 'REPLACE'
source_folder = 'REPLACE/'
dest_bucket = 'REPLACE'
dest_folder = 'REPLACE'
output_name = 'REPLACE'
def get_file_name():
try:
s3 = boto3.client("s3")
logging.info(f'Determining filename from: {source_bucket}/{source_folder}')
bucket_path = s3.list_objects(Bucket=source_bucket, Prefix=source_folder)
file_name =[key['Key'] for key in bucket_path['Contents']][1]
logging.info(file_name)
return file_name
except ClientError as e:
logging.info(f'Unable to determine file name from bucket {source_bucket}/{source_folder}')
logging.info(e)
def get_file_data(file_name):
try:
s3 = boto3.client("s3")
logging.info(f'file name from get data: {file_name}')
obj = s3.get_object(Bucket=source_bucket, Key=file_name)
body = obj['Body']
body_string = body.read().decode('utf-8')
file_data = pd.read_csv(StringIO(body_string))
#logging.info(file_data)
return file_data
except ClientError as e:
logging.info(f'Unable to read {file_name} into datafame')
logging.info(e)
def filter_file_data(file_data):
try:
all_columns = list(file_data.columns)
columns_used = ('col_1', 'col_2', 'col_3')
desired_columns = [x for x in all_columns if x in columns_used]
filtered_data = file_data[desired_columns]
logging.info(type(filtered_data)) #for testing
return filtered_data
except Exception as e:
logging.info('Unable to filter file')
logging.info(e)
下面的块是我尝试使用 "to_csv" 方法和 StringIO 编写传递给函数的现有 DF,而不是创建本地文件。 to_csv 将写入本地文件但不适用于缓冲区(是的,我尝试将缓冲区光标置于之后的起始位置但仍然没有)
def prepare_file_for_upload(filtered_data): #this is the function block where I am stuck
try:
buffer = StringIO()
output_name = 'FILE_NAME.csv'
#code below is writing to file but can not get to write to buffer
output_file = filtered_data.to_csv(buffer, sep=',')
df = pd.DataFrame(buffer) #for testing
logging.info(df) #for testing
return output_file
except Exception as e:
logging.info(f'Unable to prepare {output_name} for upload')
logging.info(e)
def upload_file(adjusted_file):
try:
#dest_key = f'{dest_folder}/{output_name}'
dest_key = f'{output_name}'
s3 = boto3.resource('s3')
s3.meta.client.upload_file(adjusted_file, dest_bucket, dest_key)
except ClientError as e:
logging.info(f'Unable to upload {output_name} to {dest_key}')
logging.info(e)
def execute_program():
file_name = get_file_name()
file_data = get_file_data(file_name)
filtered_data = filter_file_data(file_data)
adjusted_file = prepare_file_for_upload(filtered_data)
upload_file = upload_file(adjusted_file)
if __name__ == '__main__':
execute_program()
以下解决方案对我有用:
csv_buffer = StringIO()
output_file = filtered_data.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(dest_bucket, output_name).put(Body=csv_buffer.getvalue())
使用 BytesIO 对象时,请注意操作顺序。在您的代码中,您实例化了 BytesIO 对象,然后通过调用 to_csv()
来填充它。到目前为止,一切都很好。但是在使用不同于文件工作流的 BytesIO 对象时要管理的一件事是流位置。
向流写入数据后,流位置在流的末尾。如果您尝试从那个位置开始写作,您很可能什么也写不出来!该操作将完成,让您摸不着头脑,为什么没有结果写入 S3。将参数 0
的 seek()
调用添加到您的函数。这是一个演示程序,演示:
from io import BytesIO
import boto3
import pandas
from pandas import util
df = util.testing.makeMixedDataFrame()
s3_resource = boto3.resource("s3")
buffer = BytesIO()
df.to_csv(buffer, sep=",", index=False, mode="wb", encoding="UTF-8")
# The following call to `tell()` returns the stream position. 0 is the beginning of the file.
df.tell()
>> 134
# Reposition stream to the beginning by calling `seek(0)` before uploading
df.seek(0)
s3_r.Object("test-bucket", "test_df_from_resource.csv").put(Body=buffer.getvalue())
您应该会得到类似于以下内容的响应(带有实际值)
>> {'ResponseMetadata': {'RequestId': 'request-id-value',
'HostId': '###########',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amz-id-2': '############',
'x-amz-request-id': '00000',
'date': 'Tue, 31 Aug 2021 00:00:00 GMT',
'x-amz-server-side-encryption': 'value',
'etag': '"xxxx"',
'server': 'AmazonS3',
'content-length': '0'},
'RetryAttempts': 0},
'ETag': '"xxxx"',
'ServerSideEncryption': 'value'}
更改代码以移动流位置应该可以解决您面临的问题。另外值得一提的是,Pandas had a bug that caused unexpected behavior when writing to a bytes object. It was fixed and the sample I provided assumes you are running a version of Python greater than 3.8 and a version of Pandas greater than 1.3.2. Further information on IO can be found in the python documentation.
Objective 这段代码是从指定的 S3 存储桶中读取现有的 CSV 文件到数据帧中,过滤所需列的数据帧,然后写入 filtered 使用 StringIO 将数据帧转换为 CSV 对象,我可以将其上传到不同的 S3 存储桶。
现在一切正常除了函数的代码块"prepare_file_for_upload"。以下是完整的代码块:
from io import StringIO
import io #unsued at the moment
import logging
import pandas as pd
import boto3
from botocore.exceptions import ClientError
FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)
#S3 parameters
source_bucket = 'REPLACE'
source_folder = 'REPLACE/'
dest_bucket = 'REPLACE'
dest_folder = 'REPLACE'
output_name = 'REPLACE'
def get_file_name():
try:
s3 = boto3.client("s3")
logging.info(f'Determining filename from: {source_bucket}/{source_folder}')
bucket_path = s3.list_objects(Bucket=source_bucket, Prefix=source_folder)
file_name =[key['Key'] for key in bucket_path['Contents']][1]
logging.info(file_name)
return file_name
except ClientError as e:
logging.info(f'Unable to determine file name from bucket {source_bucket}/{source_folder}')
logging.info(e)
def get_file_data(file_name):
try:
s3 = boto3.client("s3")
logging.info(f'file name from get data: {file_name}')
obj = s3.get_object(Bucket=source_bucket, Key=file_name)
body = obj['Body']
body_string = body.read().decode('utf-8')
file_data = pd.read_csv(StringIO(body_string))
#logging.info(file_data)
return file_data
except ClientError as e:
logging.info(f'Unable to read {file_name} into datafame')
logging.info(e)
def filter_file_data(file_data):
try:
all_columns = list(file_data.columns)
columns_used = ('col_1', 'col_2', 'col_3')
desired_columns = [x for x in all_columns if x in columns_used]
filtered_data = file_data[desired_columns]
logging.info(type(filtered_data)) #for testing
return filtered_data
except Exception as e:
logging.info('Unable to filter file')
logging.info(e)
下面的块是我尝试使用 "to_csv" 方法和 StringIO 编写传递给函数的现有 DF,而不是创建本地文件。 to_csv 将写入本地文件但不适用于缓冲区(是的,我尝试将缓冲区光标置于之后的起始位置但仍然没有)
def prepare_file_for_upload(filtered_data): #this is the function block where I am stuck
try:
buffer = StringIO()
output_name = 'FILE_NAME.csv'
#code below is writing to file but can not get to write to buffer
output_file = filtered_data.to_csv(buffer, sep=',')
df = pd.DataFrame(buffer) #for testing
logging.info(df) #for testing
return output_file
except Exception as e:
logging.info(f'Unable to prepare {output_name} for upload')
logging.info(e)
def upload_file(adjusted_file):
try:
#dest_key = f'{dest_folder}/{output_name}'
dest_key = f'{output_name}'
s3 = boto3.resource('s3')
s3.meta.client.upload_file(adjusted_file, dest_bucket, dest_key)
except ClientError as e:
logging.info(f'Unable to upload {output_name} to {dest_key}')
logging.info(e)
def execute_program():
file_name = get_file_name()
file_data = get_file_data(file_name)
filtered_data = filter_file_data(file_data)
adjusted_file = prepare_file_for_upload(filtered_data)
upload_file = upload_file(adjusted_file)
if __name__ == '__main__':
execute_program()
以下解决方案对我有用:
csv_buffer = StringIO()
output_file = filtered_data.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(dest_bucket, output_name).put(Body=csv_buffer.getvalue())
使用 BytesIO 对象时,请注意操作顺序。在您的代码中,您实例化了 BytesIO 对象,然后通过调用 to_csv()
来填充它。到目前为止,一切都很好。但是在使用不同于文件工作流的 BytesIO 对象时要管理的一件事是流位置。
向流写入数据后,流位置在流的末尾。如果您尝试从那个位置开始写作,您很可能什么也写不出来!该操作将完成,让您摸不着头脑,为什么没有结果写入 S3。将参数 0
的 seek()
调用添加到您的函数。这是一个演示程序,演示:
from io import BytesIO
import boto3
import pandas
from pandas import util
df = util.testing.makeMixedDataFrame()
s3_resource = boto3.resource("s3")
buffer = BytesIO()
df.to_csv(buffer, sep=",", index=False, mode="wb", encoding="UTF-8")
# The following call to `tell()` returns the stream position. 0 is the beginning of the file.
df.tell()
>> 134
# Reposition stream to the beginning by calling `seek(0)` before uploading
df.seek(0)
s3_r.Object("test-bucket", "test_df_from_resource.csv").put(Body=buffer.getvalue())
您应该会得到类似于以下内容的响应(带有实际值)
>> {'ResponseMetadata': {'RequestId': 'request-id-value',
'HostId': '###########',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amz-id-2': '############',
'x-amz-request-id': '00000',
'date': 'Tue, 31 Aug 2021 00:00:00 GMT',
'x-amz-server-side-encryption': 'value',
'etag': '"xxxx"',
'server': 'AmazonS3',
'content-length': '0'},
'RetryAttempts': 0},
'ETag': '"xxxx"',
'ServerSideEncryption': 'value'}
更改代码以移动流位置应该可以解决您面临的问题。另外值得一提的是,Pandas had a bug that caused unexpected behavior when writing to a bytes object. It was fixed and the sample I provided assumes you are running a version of Python greater than 3.8 and a version of Pandas greater than 1.3.2. Further information on IO can be found in the python documentation.