将 Pandas DataFrame 作为 Pickle 写入 S3
Write Pandas DataFrame To S3 as Pickle
这是我的要求。
- 将 pandas 数据帧作为 pickle 文件上传到 AWS S3
- 由于环境原因,必须使用boto3,不能选择s3fs等替代方案
- 数据必须存在于内存中,不能写入临时文件
我创建了以下简单函数,将 Pandas 数据帧作为 csv 上传到 s3:
def df_to_s3_csv(df, filename, sep=','):
s3 = boto3.resource('s3')
buffer = io.StringIO()
df.to_csv(buffer, sep=sep, index=False)
s3.Object(s3bucket, f'{s3_upload_path}/{filename}').put(Body=buffer.getvalue())
这个函数工作正常,并且做了它应该做的事情。对于 pickle 文件,我以类似的方式创建了以下函数:
def df_to_s3_pckl(df, filename):
s3 = boto3.resource('s3')
buffer = io.BytesIO()
df.to_pickle(buffer)
buffer.seek(0)
obj = s3.Object(s3bucket, f'{s3_upload_path}/{filename}')
obj.put(Body=buffer.getvalue())
我在有和没有 seek
部分的情况下尝试了这个函数,但无论哪种方式,它都会抛出以下错误:ValueError: I/O operation on closed file.
进一步研究这个问题,我发现一旦调用 df.to_pickle
,buffer
就被认为是 closed
。这可以通过发出这些命令来重现:
buffer = io.BytesIO()
df.to_pickle(buffer)
print(buffer.closed)
以上打印 True
。 BytesIO
缓冲区似乎已被 to_pickle
关闭,因此无法引用其数据。如何解决这个问题,或者是否有满足我要求的替代方案?我在 SO 上发现了几个关于如何使用 boto3 上传到 S3 的问题,但没有关于如何使用 BytesIO 缓冲区上传 Pandas 创建的 pickle 文件的问题。
这是基本问题的最小可重现示例:
import pandas as pd
import numpy as np
import io
df = pd.DataFrame(np.random.randint(0,100,size=(4,4)))
buffer = io.BytesIO()
df.to_pickle(buffer)
print(buffer.closed)
问题似乎可以追溯到 the pandas source code。这最终可能是 pandas 中的一个错误,由 to_pickle
方法中意外使用 BytesIO
对象所揭示。我设法使用以下代码在最小可重现示例中规避了该问题,该代码使用 pickle
模块中的 dump
方法:
import pandas as pd
import numpy as np
import io
from pickle import dump
df = pd.DataFrame(np.random.randint(0,100,size=(4,4)))
buffer = io.BytesIO()
dump(df, buffer)
buffer.seek(0)
print(buffer.closed)
现在 print 语句打印 False
并且可以访问 BytesIO
流数据。
这是我的要求。
- 将 pandas 数据帧作为 pickle 文件上传到 AWS S3
- 由于环境原因,必须使用boto3,不能选择s3fs等替代方案
- 数据必须存在于内存中,不能写入临时文件
我创建了以下简单函数,将 Pandas 数据帧作为 csv 上传到 s3:
def df_to_s3_csv(df, filename, sep=','):
s3 = boto3.resource('s3')
buffer = io.StringIO()
df.to_csv(buffer, sep=sep, index=False)
s3.Object(s3bucket, f'{s3_upload_path}/{filename}').put(Body=buffer.getvalue())
这个函数工作正常,并且做了它应该做的事情。对于 pickle 文件,我以类似的方式创建了以下函数:
def df_to_s3_pckl(df, filename):
s3 = boto3.resource('s3')
buffer = io.BytesIO()
df.to_pickle(buffer)
buffer.seek(0)
obj = s3.Object(s3bucket, f'{s3_upload_path}/{filename}')
obj.put(Body=buffer.getvalue())
我在有和没有 seek
部分的情况下尝试了这个函数,但无论哪种方式,它都会抛出以下错误:ValueError: I/O operation on closed file.
进一步研究这个问题,我发现一旦调用 df.to_pickle
,buffer
就被认为是 closed
。这可以通过发出这些命令来重现:
buffer = io.BytesIO()
df.to_pickle(buffer)
print(buffer.closed)
以上打印 True
。 BytesIO
缓冲区似乎已被 to_pickle
关闭,因此无法引用其数据。如何解决这个问题,或者是否有满足我要求的替代方案?我在 SO 上发现了几个关于如何使用 boto3 上传到 S3 的问题,但没有关于如何使用 BytesIO 缓冲区上传 Pandas 创建的 pickle 文件的问题。
这是基本问题的最小可重现示例:
import pandas as pd
import numpy as np
import io
df = pd.DataFrame(np.random.randint(0,100,size=(4,4)))
buffer = io.BytesIO()
df.to_pickle(buffer)
print(buffer.closed)
问题似乎可以追溯到 the pandas source code。这最终可能是 pandas 中的一个错误,由 to_pickle
方法中意外使用 BytesIO
对象所揭示。我设法使用以下代码在最小可重现示例中规避了该问题,该代码使用 pickle
模块中的 dump
方法:
import pandas as pd
import numpy as np
import io
from pickle import dump
df = pd.DataFrame(np.random.randint(0,100,size=(4,4)))
buffer = io.BytesIO()
dump(df, buffer)
buffer.seek(0)
print(buffer.closed)
现在 print 语句打印 False
并且可以访问 BytesIO
流数据。