将 pandas 数据帧转换为内存中的类文件对象?
Turn pandas dataframe into a file-like object in memory?
我每天将大约 2 - 250 万条记录加载到 Postgres 数据库中。
然后我使用 pd.read_sql 读取这些数据以将其转换为数据框,然后我进行一些列操作和一些小的合并。我将修改后的数据另存为 table 供其他人使用。
当我做 pd.to_sql 时,它需要很长时间。如果我保存一个 csv 文件并在 Postgres 中使用 COPY FROM,整个过程只需要几分钟,但服务器在另一台机器上,在那里传输文件很痛苦。
使用 psycopg2,看起来我可以使用 copy_expert 从批量复制中获益,但仍然使用 python。如果可能的话,我想避免编写实际的 csv 文件。我可以使用 pandas 数据帧在内存中执行此操作吗?
这是我的 pandas 代码示例。如果可能,我想添加 copy_expert 或其他内容以更快地保存此数据。
for date in required_date_range:
df = pd.read_sql(sql=query, con=pg_engine, params={'x' : date})
...
do stuff to the columns
...
df.to_sql('table_name', pg_engine, index=False, if_exists='append', dtype=final_table_dtypes)
有人可以帮我写示例代码吗?我更愿意仍然使用 pandas 并且在内存中使用它会很好。如果没有,我就写一个csv临时文件,然后这样做。
编辑 - 这是我的最终代码。每个日期(数百万行)只需要几百秒而不是几个小时。
to_sql = """COPY %s FROM STDIN WITH CSV HEADER"""
def process_file(conn, table_name, file_object):
fake_conn = cms_dtypes.pg_engine.raw_connection()
fake_cur = fake_conn.cursor()
fake_cur.copy_expert(sql=to_sql % table_name, file=file_object)
fake_conn.commit()
fake_cur.close()
#after doing stuff to the dataframe
s_buf = io.StringIO()
df.to_csv(s_buf)
process_file(cms_dtypes.pg_engine, 'fact_cms_employee', s_buf)
Python 模块 io
(docs) 具有用于类文件对象的必要工具。
import io
# text buffer
s_buf = io.StringIO()
# saving a data frame to a buffer (same as with a regular file):
df.to_csv(s_buf)
编辑。
(我忘记了)为了以后从缓冲区中读取,它的位置应该设置为开头:
s_buf.seek(0)
我不熟悉 psycopg2
但根据 docs copy_expert
和 copy_from
都可以使用,例如:
cur.copy_from(s_buf, table)
(对于 Python 2,请参阅 StringIO。)
我在执行 ptrj 的解决方案时遇到了问题。
我认为问题源于pandas 将缓冲区的 pos 设置为末尾。
查看如下:
from StringIO import StringIO
df = pd.DataFrame({"name":['foo','bar'],"id":[1,2]})
s_buf = StringIO()
df.to_csv(s_buf)
s_buf.__dict__
# Output
# {'softspace': 0, 'buflist': ['foo,1\n', 'bar,2\n'], 'pos': 12, 'len': 12, 'closed': False, 'buf': ''}
请注意 pos 为 12。我必须将 pos 设置为 0 才能使后续 copy_from 命令起作用
s_buf.pos = 0
cur = conn.cursor()
cur.copy_from(s_buf, tablename, sep=',')
conn.commit()
我每天将大约 2 - 250 万条记录加载到 Postgres 数据库中。
然后我使用 pd.read_sql 读取这些数据以将其转换为数据框,然后我进行一些列操作和一些小的合并。我将修改后的数据另存为 table 供其他人使用。
当我做 pd.to_sql 时,它需要很长时间。如果我保存一个 csv 文件并在 Postgres 中使用 COPY FROM,整个过程只需要几分钟,但服务器在另一台机器上,在那里传输文件很痛苦。
使用 psycopg2,看起来我可以使用 copy_expert 从批量复制中获益,但仍然使用 python。如果可能的话,我想避免编写实际的 csv 文件。我可以使用 pandas 数据帧在内存中执行此操作吗?
这是我的 pandas 代码示例。如果可能,我想添加 copy_expert 或其他内容以更快地保存此数据。
for date in required_date_range:
df = pd.read_sql(sql=query, con=pg_engine, params={'x' : date})
...
do stuff to the columns
...
df.to_sql('table_name', pg_engine, index=False, if_exists='append', dtype=final_table_dtypes)
有人可以帮我写示例代码吗?我更愿意仍然使用 pandas 并且在内存中使用它会很好。如果没有,我就写一个csv临时文件,然后这样做。
编辑 - 这是我的最终代码。每个日期(数百万行)只需要几百秒而不是几个小时。
to_sql = """COPY %s FROM STDIN WITH CSV HEADER"""
def process_file(conn, table_name, file_object):
fake_conn = cms_dtypes.pg_engine.raw_connection()
fake_cur = fake_conn.cursor()
fake_cur.copy_expert(sql=to_sql % table_name, file=file_object)
fake_conn.commit()
fake_cur.close()
#after doing stuff to the dataframe
s_buf = io.StringIO()
df.to_csv(s_buf)
process_file(cms_dtypes.pg_engine, 'fact_cms_employee', s_buf)
Python 模块 io
(docs) 具有用于类文件对象的必要工具。
import io
# text buffer
s_buf = io.StringIO()
# saving a data frame to a buffer (same as with a regular file):
df.to_csv(s_buf)
编辑。 (我忘记了)为了以后从缓冲区中读取,它的位置应该设置为开头:
s_buf.seek(0)
我不熟悉 psycopg2
但根据 docs copy_expert
和 copy_from
都可以使用,例如:
cur.copy_from(s_buf, table)
(对于 Python 2,请参阅 StringIO。)
我在执行 ptrj 的解决方案时遇到了问题。
我认为问题源于pandas 将缓冲区的 pos 设置为末尾。
查看如下:
from StringIO import StringIO
df = pd.DataFrame({"name":['foo','bar'],"id":[1,2]})
s_buf = StringIO()
df.to_csv(s_buf)
s_buf.__dict__
# Output
# {'softspace': 0, 'buflist': ['foo,1\n', 'bar,2\n'], 'pos': 12, 'len': 12, 'closed': False, 'buf': ''}
请注意 pos 为 12。我必须将 pos 设置为 0 才能使后续 copy_from 命令起作用
s_buf.pos = 0
cur = conn.cursor()
cur.copy_from(s_buf, tablename, sep=',')
conn.commit()