将 pandas 数据帧转换为内存中的类文件对象?

Turn pandas dataframe into a file-like object in memory?

我每天将大约 2 - 250 万条记录加载到 Postgres 数据库中。

然后我使用 pd.read_sql 读取这些数据以将其转换为数据框,然后我进行一些列操作和一些小的合并。我将修改后的数据另存为 table 供其他人使用。

当我做 pd.to_sql 时,它需要很长时间。如果我保存一个 csv 文件并在 Postgres 中使用 COPY FROM,整个过程只需要几分钟,但服务器在另一台机器上,在那里传输文件很痛苦。

使用 psycopg2,看起来我可以使用 copy_expert 从批量复制中获益,但仍然使用 python。如果可能的话,我想避免编写实际的 csv 文件。我可以使用 pandas 数据帧在内存中执行此操作吗?

这是我的 pandas 代码示例。如果可能,我想添加 copy_expert 或其他内容以更快地保存此数据。

    for date in required_date_range:
        df = pd.read_sql(sql=query, con=pg_engine, params={'x' : date})
        ...
        do stuff to the columns
        ...
        df.to_sql('table_name', pg_engine, index=False, if_exists='append',  dtype=final_table_dtypes)

有人可以帮我写示例代码吗?我更愿意仍然使用 pandas 并且在内存中使用它会很好。如果没有,我就写一个csv临时文件,然后这样做。

编辑 - 这是我的最终代码。每个日期(数百万行)只需要几百秒而不是几个小时。

to_sql = """COPY %s FROM STDIN WITH CSV HEADER"""

def process_file(conn, table_name, file_object):
    fake_conn = cms_dtypes.pg_engine.raw_connection()
    fake_cur = fake_conn.cursor()
    fake_cur.copy_expert(sql=to_sql % table_name, file=file_object)
    fake_conn.commit()
    fake_cur.close()


#after doing stuff to the dataframe
    s_buf = io.StringIO()
    df.to_csv(s_buf) 
    process_file(cms_dtypes.pg_engine, 'fact_cms_employee', s_buf)

Python 模块 io(docs) 具有用于类文件对象的必要工具。

import io

# text buffer
s_buf = io.StringIO()

# saving a data frame to a buffer (same as with a regular file):
df.to_csv(s_buf)

编辑。 (我忘记了)为了以后从缓冲区中读取,它的位置应该设置为开头:

s_buf.seek(0)

我不熟悉 psycopg2 但根据 docs copy_expertcopy_from 都可以使用,例如:

cur.copy_from(s_buf, table)

(对于 Python 2,请参阅 StringIO。)

我在执行 ptrj 的解决方案时遇到了问题。

我认为问题源于pandas 将缓冲区的 pos 设置为末尾。

查看如下:

from StringIO import StringIO
df = pd.DataFrame({"name":['foo','bar'],"id":[1,2]})
s_buf = StringIO()
df.to_csv(s_buf)
s_buf.__dict__

# Output
# {'softspace': 0, 'buflist': ['foo,1\n', 'bar,2\n'], 'pos': 12, 'len': 12, 'closed': False, 'buf': ''}

请注意 pos 为 12。我必须将 pos 设置为 0 才能使后续 copy_from 命令起作用

s_buf.pos = 0
cur = conn.cursor()
cur.copy_from(s_buf, tablename, sep=',')
conn.commit()