将 pandas 数据框批量插入 PostgreSQL table 的最佳方法

Optimal approach to bulk insert of pandas dataframe into PostgreSQL table

我需要将多个 excel 文件上传到 postgresql table 但它们可以在多个寄存器中相互重叠,因此我需要注意 IntegrityErrors。我遵循两种方法:

cursor.copy_from:最快的方法,但由于重复的寄存器

,我不知道如何捕获和控制所有Integrityerrors
streamCSV = StringIO()
streamCSV.write(invoicing_info.to_csv(index=None, header=None, sep=';')) 
streamCSV.seek(0)  

with conn.cursor() as c:
    c.copy_from(streamCSV, "staging.table_name", columns=dataframe.columns, sep=';')
    conn.commit()

cursor.execute:我可以计算并处理每个异常,但它非常 减缓。

data = invoicing_info.to_dict(orient='records')

with cursor as c:
    for entry in data:
        try:
            c.execute(DLL_INSERT, entry)
            successful_inserts += 1
            connection.commit()
            print('Successful insert. Operation number {}'.format(successful_inserts))
        except psycopg2.IntegrityError as duplicate:
            duplicate_registers += 1
            connection.rollback()
            print('Duplicate entry. Operation number {}'.format(duplicate_registers))

在例程结束时,我需要确定以下信息:

print("Initial shape: {}".format(invoicing_info.shape))
print("Successful inserts: {}".format(successful_inserts))
print("Duplicate entries: {}".format(duplicate_registers))

如何修改第一种方法来控制所有异常?如何优化第二种方法?

虽然您在不同的 excel sheet 中有重复的 ID,但您必须自己回答如何决定 excel sheet 信任的数据?

当您使用多个 table 时,将使用方法从冲突的对中获取至少一行,您始终可以执行以下操作:

  • 为每个 excel sheet
  • 创建临时 table
  • 将数据上传到每个 table excel sheet(就像你现在批量上传一样)
  • 从 select 中选择不同的(id)进行插入,方式如下:

INSERT INTO staging.table_name(id, col1, col2 ...)
SELECT DISTINCT ON(id) 
     id, col1, col2
FROM 
(
    SELECT id, col1, col2 ... 
       FROM staging.temp_table_for_excel_sheet1
    UNION
    SELECT id, col1, col2 ... 
       FROM staging.temp_table_for_excel_sheet2
    UNION
    SELECT id, col1, col2 ... 
       FROM staging.temp_table_for_excel_sheet3
) as data

使用这样的插入 postgreSQL 将从 non-unique 个 ID 集中取出随机行。

如果您想信任第一条记录,您可以添加一些顺序:

INSERT INTO staging.table_name(id, col1, col2 ...)
SELECT DISTINCT ON(id) 
     id, ordering_column col1, col2
FROM 
(
    SELECT id, 1 as ordering_column, col1, col2 ... 
       FROM staging.temp_table_for_excel_sheet1
    UNION
    SELECT id, 2 as ordering_column, col1, col2 ... 
       FROM staging.temp_table_for_excel_sheet2
    UNION
    SELECT id, 3 as ordering_column, col1, col2 ... 
       FROM staging.temp_table_for_excel_sheet3
) as data
ORDER BY ordering_column

初始对象数:

SELECT sum(count)
FROM 
( 
  SELECT count(*) as count FROM temp_table_for_excel_sheet1
  UNION
  SELECT count(*) as count FROM temp_table_for_excel_sheet2
  UNION
  SELECT count(*) as count FROM temp_table_for_excel_sheet3
) as data

完成此批量插入后,您可以运行 select count(*) FROM staging.table_name 获得插入记录总数的结果

对于重复计数,您可以 运行:

SELECT sum(count)
FROM 
(
SELECT count(*) as count 
FROM  temp_table_for_excel_sheet2 WHERE id in (select id FROM temp_table_for_excel_sheet1 )

UNION

SELECT count(*) as count 
FROM  temp_table_for_excel_sheet3 WHERE id in (select id FROM temp_table_for_excel_sheet1 )
)

UNION

SELECT count(*) as count 
FROM  temp_table_for_excel_sheet3 WHERE id in (select id FROM temp_table_for_excel_sheet2 )
) as data

如果 excel 工作表包含重复记录,Pandas 似乎是识别和消除重复记录的可能选择:https://33sticks.com/python-for-business-identifying-duplicate-data/。或者是不同表中不同记录具有相同id/index的问题?如果是这样,类似的方法可以在您使用 Pandas 隔离多次使用的 ID,然后在尝试上传到 SQL 数据库之前使用唯一标识符更正它们。

对于批量上传,我会使用 ORM。 SQLAlchemy 有一些关于批量上传的重要信息:http://docs.sqlalchemy.org/en/rel_1_0/orm/persistence_techniques.html#bulk-operations, and there's a related discussion here: Bulk insert with SQLAlchemy ORM