将 pandas 数据框批量插入 PostgreSQL table 的最佳方法
Optimal approach to bulk insert of pandas dataframe into PostgreSQL table
我需要将多个 excel 文件上传到 postgresql table 但它们可以在多个寄存器中相互重叠,因此我需要注意 IntegrityErrors
。我遵循两种方法:
cursor.copy_from
:最快的方法,但由于重复的寄存器
,我不知道如何捕获和控制所有Integrityerrors
streamCSV = StringIO()
streamCSV.write(invoicing_info.to_csv(index=None, header=None, sep=';'))
streamCSV.seek(0)
with conn.cursor() as c:
c.copy_from(streamCSV, "staging.table_name", columns=dataframe.columns, sep=';')
conn.commit()
cursor.execute
:我可以计算并处理每个异常,但它非常
减缓。
data = invoicing_info.to_dict(orient='records')
with cursor as c:
for entry in data:
try:
c.execute(DLL_INSERT, entry)
successful_inserts += 1
connection.commit()
print('Successful insert. Operation number {}'.format(successful_inserts))
except psycopg2.IntegrityError as duplicate:
duplicate_registers += 1
connection.rollback()
print('Duplicate entry. Operation number {}'.format(duplicate_registers))
在例程结束时,我需要确定以下信息:
print("Initial shape: {}".format(invoicing_info.shape))
print("Successful inserts: {}".format(successful_inserts))
print("Duplicate entries: {}".format(duplicate_registers))
如何修改第一种方法来控制所有异常?如何优化第二种方法?
虽然您在不同的 excel sheet 中有重复的 ID,但您必须自己回答如何决定 excel sheet 信任的数据?
当您使用多个 table 时,将使用方法从冲突的对中获取至少一行,您始终可以执行以下操作:
- 为每个 excel sheet
创建临时 table
- 将数据上传到每个 table excel sheet(就像你现在批量上传一样)
- 从 select 中选择不同的(id)进行插入,方式如下:
INSERT INTO staging.table_name(id, col1, col2 ...)
SELECT DISTINCT ON(id)
id, col1, col2
FROM
(
SELECT id, col1, col2 ...
FROM staging.temp_table_for_excel_sheet1
UNION
SELECT id, col1, col2 ...
FROM staging.temp_table_for_excel_sheet2
UNION
SELECT id, col1, col2 ...
FROM staging.temp_table_for_excel_sheet3
) as data
使用这样的插入 postgreSQL 将从 non-unique 个 ID 集中取出随机行。
如果您想信任第一条记录,您可以添加一些顺序:
INSERT INTO staging.table_name(id, col1, col2 ...)
SELECT DISTINCT ON(id)
id, ordering_column col1, col2
FROM
(
SELECT id, 1 as ordering_column, col1, col2 ...
FROM staging.temp_table_for_excel_sheet1
UNION
SELECT id, 2 as ordering_column, col1, col2 ...
FROM staging.temp_table_for_excel_sheet2
UNION
SELECT id, 3 as ordering_column, col1, col2 ...
FROM staging.temp_table_for_excel_sheet3
) as data
ORDER BY ordering_column
初始对象数:
SELECT sum(count)
FROM
(
SELECT count(*) as count FROM temp_table_for_excel_sheet1
UNION
SELECT count(*) as count FROM temp_table_for_excel_sheet2
UNION
SELECT count(*) as count FROM temp_table_for_excel_sheet3
) as data
完成此批量插入后,您可以运行 select count(*) FROM staging.table_name
获得插入记录总数的结果
对于重复计数,您可以 运行:
SELECT sum(count)
FROM
(
SELECT count(*) as count
FROM temp_table_for_excel_sheet2 WHERE id in (select id FROM temp_table_for_excel_sheet1 )
UNION
SELECT count(*) as count
FROM temp_table_for_excel_sheet3 WHERE id in (select id FROM temp_table_for_excel_sheet1 )
)
UNION
SELECT count(*) as count
FROM temp_table_for_excel_sheet3 WHERE id in (select id FROM temp_table_for_excel_sheet2 )
) as data
如果 excel 工作表包含重复记录,Pandas 似乎是识别和消除重复记录的可能选择:https://33sticks.com/python-for-business-identifying-duplicate-data/。或者是不同表中不同记录具有相同id/index的问题?如果是这样,类似的方法可以在您使用 Pandas 隔离多次使用的 ID,然后在尝试上传到 SQL 数据库之前使用唯一标识符更正它们。
对于批量上传,我会使用 ORM。 SQLAlchemy 有一些关于批量上传的重要信息:http://docs.sqlalchemy.org/en/rel_1_0/orm/persistence_techniques.html#bulk-operations, and there's a related discussion here: Bulk insert with SQLAlchemy ORM
我需要将多个 excel 文件上传到 postgresql table 但它们可以在多个寄存器中相互重叠,因此我需要注意 IntegrityErrors
。我遵循两种方法:
cursor.copy_from
:最快的方法,但由于重复的寄存器
Integrityerrors
streamCSV = StringIO()
streamCSV.write(invoicing_info.to_csv(index=None, header=None, sep=';'))
streamCSV.seek(0)
with conn.cursor() as c:
c.copy_from(streamCSV, "staging.table_name", columns=dataframe.columns, sep=';')
conn.commit()
cursor.execute
:我可以计算并处理每个异常,但它非常
减缓。
data = invoicing_info.to_dict(orient='records')
with cursor as c:
for entry in data:
try:
c.execute(DLL_INSERT, entry)
successful_inserts += 1
connection.commit()
print('Successful insert. Operation number {}'.format(successful_inserts))
except psycopg2.IntegrityError as duplicate:
duplicate_registers += 1
connection.rollback()
print('Duplicate entry. Operation number {}'.format(duplicate_registers))
在例程结束时,我需要确定以下信息:
print("Initial shape: {}".format(invoicing_info.shape))
print("Successful inserts: {}".format(successful_inserts))
print("Duplicate entries: {}".format(duplicate_registers))
如何修改第一种方法来控制所有异常?如何优化第二种方法?
虽然您在不同的 excel sheet 中有重复的 ID,但您必须自己回答如何决定 excel sheet 信任的数据?
当您使用多个 table 时,将使用方法从冲突的对中获取至少一行,您始终可以执行以下操作:
- 为每个 excel sheet 创建临时 table
- 将数据上传到每个 table excel sheet(就像你现在批量上传一样)
- 从 select 中选择不同的(id)进行插入,方式如下:
INSERT INTO staging.table_name(id, col1, col2 ...)
SELECT DISTINCT ON(id)
id, col1, col2
FROM
(
SELECT id, col1, col2 ...
FROM staging.temp_table_for_excel_sheet1
UNION
SELECT id, col1, col2 ...
FROM staging.temp_table_for_excel_sheet2
UNION
SELECT id, col1, col2 ...
FROM staging.temp_table_for_excel_sheet3
) as data
使用这样的插入 postgreSQL 将从 non-unique 个 ID 集中取出随机行。
如果您想信任第一条记录,您可以添加一些顺序:
INSERT INTO staging.table_name(id, col1, col2 ...)
SELECT DISTINCT ON(id)
id, ordering_column col1, col2
FROM
(
SELECT id, 1 as ordering_column, col1, col2 ...
FROM staging.temp_table_for_excel_sheet1
UNION
SELECT id, 2 as ordering_column, col1, col2 ...
FROM staging.temp_table_for_excel_sheet2
UNION
SELECT id, 3 as ordering_column, col1, col2 ...
FROM staging.temp_table_for_excel_sheet3
) as data
ORDER BY ordering_column
初始对象数:
SELECT sum(count)
FROM
(
SELECT count(*) as count FROM temp_table_for_excel_sheet1
UNION
SELECT count(*) as count FROM temp_table_for_excel_sheet2
UNION
SELECT count(*) as count FROM temp_table_for_excel_sheet3
) as data
完成此批量插入后,您可以运行 select count(*) FROM staging.table_name
获得插入记录总数的结果
对于重复计数,您可以 运行:
SELECT sum(count)
FROM
(
SELECT count(*) as count
FROM temp_table_for_excel_sheet2 WHERE id in (select id FROM temp_table_for_excel_sheet1 )
UNION
SELECT count(*) as count
FROM temp_table_for_excel_sheet3 WHERE id in (select id FROM temp_table_for_excel_sheet1 )
)
UNION
SELECT count(*) as count
FROM temp_table_for_excel_sheet3 WHERE id in (select id FROM temp_table_for_excel_sheet2 )
) as data
如果 excel 工作表包含重复记录,Pandas 似乎是识别和消除重复记录的可能选择:https://33sticks.com/python-for-business-identifying-duplicate-data/。或者是不同表中不同记录具有相同id/index的问题?如果是这样,类似的方法可以在您使用 Pandas 隔离多次使用的 ID,然后在尝试上传到 SQL 数据库之前使用唯一标识符更正它们。
对于批量上传,我会使用 ORM。 SQLAlchemy 有一些关于批量上传的重要信息:http://docs.sqlalchemy.org/en/rel_1_0/orm/persistence_techniques.html#bulk-operations, and there's a related discussion here: Bulk insert with SQLAlchemy ORM