将 Pandas DataFrame 作为 Pickle 写入 S3

Write Pandas DataFrame To S3 as Pickle

这是我的要求。

我创建了以下简单函数,将 Pandas 数据帧作为 csv 上传到 s3:

def df_to_s3_csv(df, filename, sep=','):
    s3 = boto3.resource('s3')
    buffer = io.StringIO()
    df.to_csv(buffer, sep=sep, index=False)
    s3.Object(s3bucket, f'{s3_upload_path}/{filename}').put(Body=buffer.getvalue())

这个函数工作正常,并且做了它应该做的事情。对于 pickle 文件,我以类似的方式创建了以下函数:

def df_to_s3_pckl(df, filename):
    s3 = boto3.resource('s3')
    buffer = io.BytesIO()
    df.to_pickle(buffer)
    buffer.seek(0)
    obj = s3.Object(s3bucket, f'{s3_upload_path}/{filename}')
    obj.put(Body=buffer.getvalue())

我在有和没有 seek 部分的情况下尝试了这个函数,但无论哪种方式,它都会抛出以下错误:ValueError: I/O operation on closed file.

进一步研究这个问题,我发现一旦调用 df.to_picklebuffer 就被认为是 closed。这可以通过发出这些命令来重现:

buffer = io.BytesIO()
df.to_pickle(buffer)
print(buffer.closed)

以上打印 TrueBytesIO 缓冲区似乎已被 to_pickle 关闭,因此无法引用其数据。如何解决这个问题,或者是否有满足我要求的替代方案?我在 SO 上发现了几个关于如何使用 boto3 上传到 S3 的问题,但没有关于如何使用 BytesIO 缓冲区上传 Pandas 创建的 pickle 文件的问题。

这是基本问题的最小可重现示例:

import pandas as pd
import numpy as np
import io
df = pd.DataFrame(np.random.randint(0,100,size=(4,4)))                                                                                   
buffer = io.BytesIO()
df.to_pickle(buffer)
print(buffer.closed)

问题似乎可以追溯到 the pandas source code。这最终可能是 pandas 中的一个错误,由 to_pickle 方法中意外使用 BytesIO 对象所揭示。我设法使用以下代码在最小可重现示例中规避了该问题,该代码使用 pickle 模块中的 dump 方法:

import pandas as pd
import numpy as np
import io
from pickle import dump
df = pd.DataFrame(np.random.randint(0,100,size=(4,4)))
buffer = io.BytesIO()
dump(df, buffer)
buffer.seek(0)
print(buffer.closed)

现在 print 语句打印 False 并且可以访问 BytesIO 流数据。