如何将 pandas DataFrame 更新到 PostgreSQL table?

How to upsert pandas DataFrame to PostgreSQL table?

我从网络资源中抓取了一些数据并将其全部存储在 pandas DataFrame 中。现在,为了利用 SQLAlchemy 提供的强大的数据库工具,我想将所述 DataFrame 转换为 Table() 对象,并最终将所有数据更新到 PostgreSQL table 中。如果可行,完成这项任务的可行方法是什么?

如果您已经有一个 pandas 数据框,您可以使用 df.to_sql 直接通过 SQLAlchemy 推送数据

from sqlalchemy import create_engine
#create a connection from Postgre URI
cnxn = create_engine("postgresql+psycopg2://username:password@host:port/database")
#write dataframe to database
df.to_sql("my_table", con=cnxn, schema="myschema")

如果您使用的是 PostgreSQL 9.5 或更高版本,您可以使用临时 table 和 INSERT ... ON CONFLICT 语句执行 UPSERT:

import sqlalchemy as sa

# …

with engine.begin() as conn:
    # step 0.0 - create test environment
    conn.exec_driver_sql("DROP TABLE IF EXISTS main_table")
    conn.exec_driver_sql(
        "CREATE TABLE main_table (id int primary key, txt varchar(50))"
    )
    conn.exec_driver_sql(
        "INSERT INTO main_table (id, txt) VALUES (1, 'row 1 old text')"
    )
    # step 0.1 - create DataFrame to UPSERT
    df = pd.DataFrame(
        [(2, "new row 2 text"), (1, "row 1 new text")], columns=["id", "txt"]
    )
    
    # step 1 - create temporary table and upload DataFrame
    conn.exec_driver_sql(
        "CREATE TEMPORARY TABLE temp_table AS SELECT * FROM main_table WHERE false"
    )
    df.to_sql("temp_table", conn, index=False, if_exists="append")

    # step 2 - merge temp_table into main_table
    conn.exec_driver_sql(
        """\
        INSERT INTO main_table (id, txt) 
        SELECT id, txt FROM temp_table
        ON CONFLICT (id) DO
            UPDATE SET txt = EXCLUDED.txt
        """
    )

    # step 3 - confirm results
    result = conn.exec_driver_sql("SELECT * FROM main_table ORDER BY id").all()
    print(result)  # [(1, 'row 1 new text'), (2, 'new row 2 text')]

这是我的代码,用于批量插入和插入来自 pandas 数据帧的 postgresql 的冲突更新查询:

假设 id 是 postgresql table 和 pandas df 的唯一键,你想根据这个 id 插入和更新。

import pandas as pd
from sqlalchemy import create_engine, text

engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f""" 
                INSERT INTO schema.table(name, title, id)
                VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
                ON CONFLICT (id)
                DO  UPDATE SET name= excluded.name,
                               title= excluded.title
         """)
engine.execute(query)

确保您的 df 列必须与 table 的顺序相同。

编辑 1:

感谢 Gord Thompson 的评论,我意识到如果列中有单引号,则此查询将不起作用。因此,如果列中有单引号,这里有一个修复方法:

import pandas as pd
from sqlalchemy import create_engine, text

df.name = df.name.str.replace("'", "''")
df.title = df.title.str.replace("'", "''")
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(""" 
            INSERT INTO author(name, title, id)
            VALUES %s
            ON CONFLICT (id)
            DO  UPDATE SET name= excluded.name,
                           title= excluded.title
     """ % ','.join([str(i) for i in list(df.to_records(index=False))]).replace('"', "'"))
engine.execute(query)

如果您的 DataFrame 和 SQL Table 已经包含相同的列名和类型,请考虑使用此函数。 优点:

  • 如果您要插入一个长数据框,那就太好了。 (批处理)
  • 避免在代码中编写长 sql 语句。

.

from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd


def upsert_database(list_input: pd.DataFrame, engine: sql_engine, table: str, schema: str) -> None:
    if len(list_input) == 0:
        return None
    flattened_input = list_input.to_dict('records')
    with engine.connect() as conn:
        base = automap_base()
        base.prepare(engine, reflect=True, schema=schema)
        target_table = Table(table, base.metadata,
                             autoload=True, autoload_with=engine, schema=schema)
        chunks = [flattened_input[i:i + 1000] for i in range(0, len(flattened_input), 1000)]
        for chunk in chunks:
            stmt = insert(target_table).values(chunk)
            update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
            conn.execute(stmt.on_conflict_do_update(
                constraint=f'{table}_pkey',
                set_=update_dict)
            )

我多次需要这个,我最终创建了一个 gist for it

函数如下,如果它是第一次持久化数据帧,它将创建 table,如果它已经存在,它将更新 table:

import pandas as pd
import sqlalchemy
import uuid
import os

def upsert_df(df: pd.DataFrame, table_name: str, engine: sqlalchemy.engine.Engine):
    """Implements the equivalent of pd.DataFrame.to_sql(..., if_exists='update')
    (which does not exist). Creates or updates the db records based on the
    dataframe records.
    Conflicts to determine update are based on the dataframes index.
    This will set unique keys constraint on the table equal to the index names
    1. Create a temp table from the dataframe
    2. Insert/update from temp table into table_name
    Returns: True if successful
    """

    # If the table does not exist, we should just use to_sql to create it
    if not engine.execute(
        f"""SELECT EXISTS (
            SELECT FROM information_schema.tables 
            WHERE  table_schema = 'public'
            AND    table_name   = '{table_name}');
            """
    ).first()[0]:
        df.to_sql(table_name, engine)
        return True

    # If it already exists...
    temp_table_name = f"temp_{uuid.uuid4().hex[:6]}"
    df.to_sql(temp_table_name, engine, index=True)

    index = list(df.index.names)
    index_sql_txt = ", ".join([f'"{i}"' for i in index])
    columns = list(df.columns)
    headers = index + columns
    headers_sql_txt = ", ".join(
        [f'"{i}"' for i in headers]
    )  # index1, index2, ..., column 1, col2, ...

    # col1 = exluded.col1, col2=excluded.col2
    update_column_stmt = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in columns])

    # For the ON CONFLICT clause, postgres requires that the columns have unique constraint
    query_pk = f"""
    ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS unique_constraint_for_upsert;
    ALTER TABLE "{table_name}" ADD CONSTRAINT unique_constraint_for_upsert UNIQUE ({index_sql_txt});
    """
    engine.execute(query_pk)

    # Compose and execute upsert query
    query_upsert = f"""
    INSERT INTO "{table_name}" ({headers_sql_txt}) 
    SELECT {headers_sql_txt} FROM "{temp_table_name}"
    ON CONFLICT ({index_sql_txt}) DO UPDATE 
    SET {update_column_stmt};
    """
    engine.execute(query_upsert)
    engine.execute(f"DROP TABLE {temp_table_name}")

    return True