pandas 或 psycopg2 中的 Postgres 9.5 upsert 命令?

Postgres 9.5 upsert command in pandas or psycopg2?

我看到的大多数示例都是人们使用 ON CONFLICT DO UPDATE 语法将单行插入数据库。

有人有使用 SQLAlchemy 或 pandas.to_sql 的示例吗?

我 99% 的插入都使用 psycopg2 COPY 命令(所以我保存一个 csv 或 stringio,然后批量插入),另外 1% 是 pd.to_sql。我检查新行或维度的所有逻辑都在 Python.

中完成
def find_new_rows(existing, current, id_col):
        current[id_col] = current[id_col].astype(int)
        x = existing[['datetime', id_col, 'key1']]
        y = current[['datetime', id_col, 'key2']]
        final = pd.merge(y, x, how='left', on=['datetime', id_col])
        final = final[~(final['key2'] == final['key1'])]
        final = final.drop(['key1'], axis=1)
        current = pd.merge(current, final, how='left', on=['datetime', id_col])
        current = current.loc[current['key2_y'] == 1]
        current.drop(['key2_x', 'key2_y'], axis=1, inplace=True)
        return current

谁能告诉我一个使用新的 PostgreSQL 语法进行 pyscopg2 upsert 的例子?一个常见的用例是检查维度变化(每天 50k - 100k 行,我将其与现有值进行比较)这是 CONFLICT DO NOTHING 仅添加新行。

另一个用例是我有随时间变化的事实数据。我只取最近的值(我目前使用 select 不同的视图),但如果可能的话,UPSERT 会更好。

仅供参考,这是我目前使用的解决方案。

它似乎很适合我的目的。不过,我不得不添加一行以用 None 替换空 (NaT) 时间戳,因为我在将每一行加载到数据库中时遇到错误。

def create_update_query(table):
    """This function creates an upsert query which replaces existing data based on primary key conflicts"""
    columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS])
    constraint = ', '.join([f'{col}' for col in PRIMARY_KEY])
    placeholder = ', '.join([f'%({col})s' for col in DATABASE_COLUMNS])
    updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in DATABASE_COLUMNS])
    query = f"""INSERT INTO {table} ({columns}) 
                VALUES ({placeholder}) 
                ON CONFLICT ({constraint}) 
                DO UPDATE SET {updates};"""
    query.split()
    query = ' '.join(query.split())
    return query


def load_updates(df, table, connection):
    conn = connection.get_conn()
    cursor = conn.cursor()
    df1 = df.where((pd.notnull(df)), None)
    insert_values = df1.to_dict(orient='records')
    for row in insert_values:
        cursor.execute(create_update_query(table=table), row)
        conn.commit()
    row_count = len(insert_values)
    logging.info(f'Inserted {row_count} rows.')
    cursor.close()
    del cursor
    conn.close()

这是我的代码,用于批量插入和插入来自 pandas 数据帧的 postgresql 的冲突更新查询:

假设 id 是 postgresql table 和 pandas df 的唯一键,你想根据这个 id 插入和更新。

import pandas as pd
from sqlalchemy import create_engine, text

engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f""" 
                INSERT INTO schema.table(name, title, id)
                VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
                ON CONFLICT (id)
                DO  UPDATE SET name= excluded.name,
                               title= excluded.title
         """)
engine.execute(query)

确保您的 df 列必须与您的 table.

的顺序相同

对于我的情况,我先写了一个临时 table,然后将临时 table 合并到我想要插入的实际 table 中。以这种方式执行更新插入可避免字符串中可能包含单引号的任何冲突。

    def upsert_dataframe_to_table(self, table_name: str, df: pd.DataFrame, schema: str, id_col:str):
    """
    Takes the given dataframe and inserts it into the table given. The data is inserted unless the key for that
    data already exists in the dataframe. If the key already exists, the data for that key is overwritten.

    :param table_name: The name of the table to send the data
    :param df: The dataframe with the data to send to the table
    :param schema: the name of the schema where the table exists
    :param id_col: The name of the primary key column
    :return: None
    """
    engine = create_engine(
        f'postgresql://{postgres_configs["username"]}:{postgres_configs["password"]}@{postgres_configs["host"]}'
        f':{postgres_configs["port"]}/{postgres_configs["db"]}'
    )
    df.to_sql('temp_table', engine, if_exists='replace')
    updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in df.columns if col != id_col])
    columns = ', '.join([f'{col}' for col in df.columns])
    query = f'INSERT INTO "{schema}".{table_name} ({columns}) ' \
            f'SELECT {columns} FROM temp_table ' \
            f'ON CONFLICT ({id_col}) DO ' \
            f'UPDATE SET {updates} '

    self.cursor.execute(query)
    self.cursor.execute('DROP TABLE temp_table')
    self.conn.commit()