Postgresql 如何使用 CSV COPY TO STDIN 进行冲突更新?

How Postgresql COPY TO STDIN With CSV do on conflic do update?

我想做

 " on conflict (time) do update set name , description "

但我不知道什么时候将 stdin 与 csv 一起使用,我不知道什么名称等于什么?和描述等于什么...

table_a:

xxx.csv:

with open('xxx/xxx.csv', 'r', encoding='utf8') as f:
    sql = """
    COPY table_a FROM STDIN With CSV on conflict (time) 
    do update set name=??, description=??;
    """
    cur.copy_expert(sql, f)
    conn.commit()

https://www.postgresql.org/docs/current/static/sql-copy.html

在 postgres

中没有 copy ... on conflict do 语句

https://www.postgresql.org/docs/current/static/sql-insert.html

insert ... on conflict do

感谢各位高手的解答

这是我的解决方案。

sql = """
CREATE TABLE temp_h (
    time ,
    name,
    description
);
COPY temp_h FROM STDIN With CSV;

INSERT INTO table_a(time, name, description)
SELECT *
FROM temp_h ON conflict (time) 
DO update set name=EXCLUDED.name, description=EXCLUDED.description;

DROP TABLE temp_h;
"""

this SO post 中,有两个答案 - 结合在一起 - 为成功使用 ON CONFLICT 提供了一个很好的解决方案。下面的示例使用 ON CONFLICT DO NOTHING;:

CREATE TEMP TABLE tmp_table 
(LIKE main_table INCLUDING DEFAULTS)
ON COMMIT DROP;

COPY tmp_table FROM 'full/file/name/here';

INSERT INTO main_table
SELECT *
FROM tmp_table
ON CONFLICT DO NOTHING;

将 main_table 的两个实例替换为您的 table 的名称。

我已经成功完成了批量 upsert 的功能(欢迎提出建议):

import io
from sqlalchemy.engine import Engine
from sqlalchemy.ext import declarative_base

BaseModel = declarative_base()


def upsert_bulk(engine: Engine, model: BaseModel, data: io.StringIO) -> None:
    """
    Fast way to upsert multiple entries at once

    :param `db`: DB Session
    :param `data`: CSV in a stream object
    """
    table_name = model.__tablename__
    temp_table_name = f"temp_{table_name}"

    columns = [c.key for c in model.__table__.columns]

    # Select only columns to be updated (in my case, all non-id columns)
    variable_columns = [c for c in columns if c != "id"]

    # Create string with set of columns to be updated
    update_set = ", ".join([f"{v}=EXCLUDED.{v}" for v in variable_columns])

    # Rewind data and prepare it for `copy_from`
    data.seek(0)

    with conn.cursor() as cur:
        # Creates temporary empty table with same columns and types as
        # the final table
        cur.execute(
            f"""
            CREATE TEMPORARY TABLE {temp_table_name} (LIKE {table_name})
            ON COMMIT DROP
            """
        )

        # Copy stream data to the created temporary table in DB
        cur.copy_from(data, temp_table_name)

        # Inserts copied data from the temporary table to the final table
        # updating existing values at each new conflict
        cur.execute(
            f"""
            INSERT INTO {table_name}({', '.join(columns)})
            SELECT * FROM {temp_table_name}
            ON CONFLICT (id) DO UPDATE SET {update_set}
            """
        )

        # Drops temporary table (I believe this step is unnecessary,
        # but tables sizes where growing without any new data modifications
        # if this command isn't executed)
        cur.execute(f"DROP TABLE {temp_table_name}")

        # Commit everything through cursor
        conn.commit()

    conn.close()