如何将 pandas DataFrame 更新到 PostgreSQL table?
How to upsert pandas DataFrame to PostgreSQL table?
我从网络资源中抓取了一些数据并将其全部存储在 pandas DataFrame 中。现在,为了利用 SQLAlchemy 提供的强大的数据库工具,我想将所述 DataFrame 转换为 Table() 对象,并最终将所有数据更新到 PostgreSQL table 中。如果可行,完成这项任务的可行方法是什么?
如果您已经有一个 pandas 数据框,您可以使用 df.to_sql 直接通过 SQLAlchemy 推送数据
from sqlalchemy import create_engine
#create a connection from Postgre URI
cnxn = create_engine("postgresql+psycopg2://username:password@host:port/database")
#write dataframe to database
df.to_sql("my_table", con=cnxn, schema="myschema")
如果您使用的是 PostgreSQL 9.5 或更高版本,您可以使用临时 table 和 INSERT ... ON CONFLICT
语句执行 UPSERT:
import sqlalchemy as sa
# …
with engine.begin() as conn:
# step 0.0 - create test environment
conn.exec_driver_sql("DROP TABLE IF EXISTS main_table")
conn.exec_driver_sql(
"CREATE TABLE main_table (id int primary key, txt varchar(50))"
)
conn.exec_driver_sql(
"INSERT INTO main_table (id, txt) VALUES (1, 'row 1 old text')"
)
# step 0.1 - create DataFrame to UPSERT
df = pd.DataFrame(
[(2, "new row 2 text"), (1, "row 1 new text")], columns=["id", "txt"]
)
# step 1 - create temporary table and upload DataFrame
conn.exec_driver_sql(
"CREATE TEMPORARY TABLE temp_table AS SELECT * FROM main_table WHERE false"
)
df.to_sql("temp_table", conn, index=False, if_exists="append")
# step 2 - merge temp_table into main_table
conn.exec_driver_sql(
"""\
INSERT INTO main_table (id, txt)
SELECT id, txt FROM temp_table
ON CONFLICT (id) DO
UPDATE SET txt = EXCLUDED.txt
"""
)
# step 3 - confirm results
result = conn.exec_driver_sql("SELECT * FROM main_table ORDER BY id").all()
print(result) # [(1, 'row 1 new text'), (2, 'new row 2 text')]
这是我的代码,用于批量插入和插入来自 pandas 数据帧的 postgresql 的冲突更新查询:
假设 id 是 postgresql table 和 pandas df 的唯一键,你想根据这个 id 插入和更新。
import pandas as pd
from sqlalchemy import create_engine, text
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f"""
INSERT INTO schema.table(name, title, id)
VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""")
engine.execute(query)
确保您的 df 列必须与 table 的顺序相同。
编辑 1:
感谢 Gord Thompson 的评论,我意识到如果列中有单引号,则此查询将不起作用。因此,如果列中有单引号,这里有一个修复方法:
import pandas as pd
from sqlalchemy import create_engine, text
df.name = df.name.str.replace("'", "''")
df.title = df.title.str.replace("'", "''")
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text("""
INSERT INTO author(name, title, id)
VALUES %s
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""" % ','.join([str(i) for i in list(df.to_records(index=False))]).replace('"', "'"))
engine.execute(query)
如果您的 DataFrame 和 SQL Table 已经包含相同的列名和类型,请考虑使用此函数。
优点:
- 如果您要插入一个长数据框,那就太好了。 (批处理)
- 避免在代码中编写长 sql 语句。
- 快
.
from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd
def upsert_database(list_input: pd.DataFrame, engine: sql_engine, table: str, schema: str) -> None:
if len(list_input) == 0:
return None
flattened_input = list_input.to_dict('records')
with engine.connect() as conn:
base = automap_base()
base.prepare(engine, reflect=True, schema=schema)
target_table = Table(table, base.metadata,
autoload=True, autoload_with=engine, schema=schema)
chunks = [flattened_input[i:i + 1000] for i in range(0, len(flattened_input), 1000)]
for chunk in chunks:
stmt = insert(target_table).values(chunk)
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
conn.execute(stmt.on_conflict_do_update(
constraint=f'{table}_pkey',
set_=update_dict)
)
我多次需要这个,我最终创建了一个 gist for it。
函数如下,如果它是第一次持久化数据帧,它将创建 table,如果它已经存在,它将更新 table:
import pandas as pd
import sqlalchemy
import uuid
import os
def upsert_df(df: pd.DataFrame, table_name: str, engine: sqlalchemy.engine.Engine):
"""Implements the equivalent of pd.DataFrame.to_sql(..., if_exists='update')
(which does not exist). Creates or updates the db records based on the
dataframe records.
Conflicts to determine update are based on the dataframes index.
This will set unique keys constraint on the table equal to the index names
1. Create a temp table from the dataframe
2. Insert/update from temp table into table_name
Returns: True if successful
"""
# If the table does not exist, we should just use to_sql to create it
if not engine.execute(
f"""SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = '{table_name}');
"""
).first()[0]:
df.to_sql(table_name, engine)
return True
# If it already exists...
temp_table_name = f"temp_{uuid.uuid4().hex[:6]}"
df.to_sql(temp_table_name, engine, index=True)
index = list(df.index.names)
index_sql_txt = ", ".join([f'"{i}"' for i in index])
columns = list(df.columns)
headers = index + columns
headers_sql_txt = ", ".join(
[f'"{i}"' for i in headers]
) # index1, index2, ..., column 1, col2, ...
# col1 = exluded.col1, col2=excluded.col2
update_column_stmt = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in columns])
# For the ON CONFLICT clause, postgres requires that the columns have unique constraint
query_pk = f"""
ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS unique_constraint_for_upsert;
ALTER TABLE "{table_name}" ADD CONSTRAINT unique_constraint_for_upsert UNIQUE ({index_sql_txt});
"""
engine.execute(query_pk)
# Compose and execute upsert query
query_upsert = f"""
INSERT INTO "{table_name}" ({headers_sql_txt})
SELECT {headers_sql_txt} FROM "{temp_table_name}"
ON CONFLICT ({index_sql_txt}) DO UPDATE
SET {update_column_stmt};
"""
engine.execute(query_upsert)
engine.execute(f"DROP TABLE {temp_table_name}")
return True
我从网络资源中抓取了一些数据并将其全部存储在 pandas DataFrame 中。现在,为了利用 SQLAlchemy 提供的强大的数据库工具,我想将所述 DataFrame 转换为 Table() 对象,并最终将所有数据更新到 PostgreSQL table 中。如果可行,完成这项任务的可行方法是什么?
如果您已经有一个 pandas 数据框,您可以使用 df.to_sql 直接通过 SQLAlchemy 推送数据
from sqlalchemy import create_engine
#create a connection from Postgre URI
cnxn = create_engine("postgresql+psycopg2://username:password@host:port/database")
#write dataframe to database
df.to_sql("my_table", con=cnxn, schema="myschema")
如果您使用的是 PostgreSQL 9.5 或更高版本,您可以使用临时 table 和 INSERT ... ON CONFLICT
语句执行 UPSERT:
import sqlalchemy as sa
# …
with engine.begin() as conn:
# step 0.0 - create test environment
conn.exec_driver_sql("DROP TABLE IF EXISTS main_table")
conn.exec_driver_sql(
"CREATE TABLE main_table (id int primary key, txt varchar(50))"
)
conn.exec_driver_sql(
"INSERT INTO main_table (id, txt) VALUES (1, 'row 1 old text')"
)
# step 0.1 - create DataFrame to UPSERT
df = pd.DataFrame(
[(2, "new row 2 text"), (1, "row 1 new text")], columns=["id", "txt"]
)
# step 1 - create temporary table and upload DataFrame
conn.exec_driver_sql(
"CREATE TEMPORARY TABLE temp_table AS SELECT * FROM main_table WHERE false"
)
df.to_sql("temp_table", conn, index=False, if_exists="append")
# step 2 - merge temp_table into main_table
conn.exec_driver_sql(
"""\
INSERT INTO main_table (id, txt)
SELECT id, txt FROM temp_table
ON CONFLICT (id) DO
UPDATE SET txt = EXCLUDED.txt
"""
)
# step 3 - confirm results
result = conn.exec_driver_sql("SELECT * FROM main_table ORDER BY id").all()
print(result) # [(1, 'row 1 new text'), (2, 'new row 2 text')]
这是我的代码,用于批量插入和插入来自 pandas 数据帧的 postgresql 的冲突更新查询:
假设 id 是 postgresql table 和 pandas df 的唯一键,你想根据这个 id 插入和更新。
import pandas as pd
from sqlalchemy import create_engine, text
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f"""
INSERT INTO schema.table(name, title, id)
VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""")
engine.execute(query)
确保您的 df 列必须与 table 的顺序相同。
编辑 1:
感谢 Gord Thompson 的评论,我意识到如果列中有单引号,则此查询将不起作用。因此,如果列中有单引号,这里有一个修复方法:
import pandas as pd
from sqlalchemy import create_engine, text
df.name = df.name.str.replace("'", "''")
df.title = df.title.str.replace("'", "''")
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text("""
INSERT INTO author(name, title, id)
VALUES %s
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""" % ','.join([str(i) for i in list(df.to_records(index=False))]).replace('"', "'"))
engine.execute(query)
如果您的 DataFrame 和 SQL Table 已经包含相同的列名和类型,请考虑使用此函数。 优点:
- 如果您要插入一个长数据框,那就太好了。 (批处理)
- 避免在代码中编写长 sql 语句。
- 快
.
from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd
def upsert_database(list_input: pd.DataFrame, engine: sql_engine, table: str, schema: str) -> None:
if len(list_input) == 0:
return None
flattened_input = list_input.to_dict('records')
with engine.connect() as conn:
base = automap_base()
base.prepare(engine, reflect=True, schema=schema)
target_table = Table(table, base.metadata,
autoload=True, autoload_with=engine, schema=schema)
chunks = [flattened_input[i:i + 1000] for i in range(0, len(flattened_input), 1000)]
for chunk in chunks:
stmt = insert(target_table).values(chunk)
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
conn.execute(stmt.on_conflict_do_update(
constraint=f'{table}_pkey',
set_=update_dict)
)
我多次需要这个,我最终创建了一个 gist for it。
函数如下,如果它是第一次持久化数据帧,它将创建 table,如果它已经存在,它将更新 table:
import pandas as pd
import sqlalchemy
import uuid
import os
def upsert_df(df: pd.DataFrame, table_name: str, engine: sqlalchemy.engine.Engine):
"""Implements the equivalent of pd.DataFrame.to_sql(..., if_exists='update')
(which does not exist). Creates or updates the db records based on the
dataframe records.
Conflicts to determine update are based on the dataframes index.
This will set unique keys constraint on the table equal to the index names
1. Create a temp table from the dataframe
2. Insert/update from temp table into table_name
Returns: True if successful
"""
# If the table does not exist, we should just use to_sql to create it
if not engine.execute(
f"""SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = '{table_name}');
"""
).first()[0]:
df.to_sql(table_name, engine)
return True
# If it already exists...
temp_table_name = f"temp_{uuid.uuid4().hex[:6]}"
df.to_sql(temp_table_name, engine, index=True)
index = list(df.index.names)
index_sql_txt = ", ".join([f'"{i}"' for i in index])
columns = list(df.columns)
headers = index + columns
headers_sql_txt = ", ".join(
[f'"{i}"' for i in headers]
) # index1, index2, ..., column 1, col2, ...
# col1 = exluded.col1, col2=excluded.col2
update_column_stmt = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in columns])
# For the ON CONFLICT clause, postgres requires that the columns have unique constraint
query_pk = f"""
ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS unique_constraint_for_upsert;
ALTER TABLE "{table_name}" ADD CONSTRAINT unique_constraint_for_upsert UNIQUE ({index_sql_txt});
"""
engine.execute(query_pk)
# Compose and execute upsert query
query_upsert = f"""
INSERT INTO "{table_name}" ({headers_sql_txt})
SELECT {headers_sql_txt} FROM "{temp_table_name}"
ON CONFLICT ({index_sql_txt}) DO UPDATE
SET {update_column_stmt};
"""
engine.execute(query_upsert)
engine.execute(f"DROP TABLE {temp_table_name}")
return True