如何在 sqlalchemy 中使用 psycopg2.extras?
How can I use psycopg2.extras in sqlalchemy?
我想将大量条目(~600k)上传到 PostgreSQL 数据库中的一个简单 table 中,每个条目有一个外键、一个时间戳和 3 个浮点数。但是,每个条目需要 60 毫秒来执行 here, thus the whole execution would take 10 h. I have found out, that it is a performance issue of executemany()
method, however it has been solved with the execute_values()
method in psycopg2 2.7.
中描述的核心批量插入
我运行的代码如下:
#build a huge list of dicts, one dict for each entry
engine.execute(SimpleTable.__table__.insert(),
values) # around 600k dicts in a list
我看到这是一个常见问题,但是我还没有设法在 sqlalchemy 本身中找到解决方案。有没有办法告诉sqlalchemy在某些场合调用execute_values()
?有没有其他方法可以在不自己构造 SQL 语句的情况下实现巨大的插入?
感谢您的帮助!
从某种意义上说,这不是您正在寻找的答案,因为这没有解决试图指示 SQLAlchemy 使用 psycopg 附加功能的问题,并且需要 – 有点 – 手册 SQL,但是:您可以使用 raw_connection()
, which allows using COPY FROM:
从引擎访问底层的 psycopg 连接
import io
import csv
from psycopg2 import sql
def bulk_copy(engine, table, values):
csv_file = io.StringIO()
headers = list(values[0].keys())
writer = csv.DictWriter(csv_file, headers)
writer.writerows(values)
csv_file.seek(0)
# NOTE: `format()` here is *not* `str.format()`, but
# `SQL.format()`. Never use plain string formatting.
copy_stmt = sql.SQL("COPY {} (" +
",".join(["{}"] * len(headers)) +
") FROM STDIN CSV").\
format(sql.Identifier(str(table.name)),
*(sql.Identifier(col) for col in headers))
# Fetch a raw psycopg connection from the SQLAlchemy engine
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.copy_expert(copy_stmt, csv_file)
conn.commit()
except:
conn.rollback()
raise
finally:
conn.close()
然后
bulk_copy(engine, SimpleTable.__table__, values)
与执行 INSERT 语句相比,这应该快得多。在这台机器上移动 600,000 条记录大约需要 8 秒,~13µs/record。您还可以将原始连接和游标与 extras 包一起使用。
与此同时,create_engine()
函数上的 use_batch_mode
标志成为可能(从 SqlAlchemy 1.2.0 开始)。参见docs。它使用 psycopg.extras
.
中的 execute_batch()
函数
我想将大量条目(~600k)上传到 PostgreSQL 数据库中的一个简单 table 中,每个条目有一个外键、一个时间戳和 3 个浮点数。但是,每个条目需要 60 毫秒来执行 here, thus the whole execution would take 10 h. I have found out, that it is a performance issue of executemany()
method, however it has been solved with the execute_values()
method in psycopg2 2.7.
我运行的代码如下:
#build a huge list of dicts, one dict for each entry
engine.execute(SimpleTable.__table__.insert(),
values) # around 600k dicts in a list
我看到这是一个常见问题,但是我还没有设法在 sqlalchemy 本身中找到解决方案。有没有办法告诉sqlalchemy在某些场合调用execute_values()
?有没有其他方法可以在不自己构造 SQL 语句的情况下实现巨大的插入?
感谢您的帮助!
从某种意义上说,这不是您正在寻找的答案,因为这没有解决试图指示 SQLAlchemy 使用 psycopg 附加功能的问题,并且需要 – 有点 – 手册 SQL,但是:您可以使用 raw_connection()
, which allows using COPY FROM:
import io
import csv
from psycopg2 import sql
def bulk_copy(engine, table, values):
csv_file = io.StringIO()
headers = list(values[0].keys())
writer = csv.DictWriter(csv_file, headers)
writer.writerows(values)
csv_file.seek(0)
# NOTE: `format()` here is *not* `str.format()`, but
# `SQL.format()`. Never use plain string formatting.
copy_stmt = sql.SQL("COPY {} (" +
",".join(["{}"] * len(headers)) +
") FROM STDIN CSV").\
format(sql.Identifier(str(table.name)),
*(sql.Identifier(col) for col in headers))
# Fetch a raw psycopg connection from the SQLAlchemy engine
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.copy_expert(copy_stmt, csv_file)
conn.commit()
except:
conn.rollback()
raise
finally:
conn.close()
然后
bulk_copy(engine, SimpleTable.__table__, values)
与执行 INSERT 语句相比,这应该快得多。在这台机器上移动 600,000 条记录大约需要 8 秒,~13µs/record。您还可以将原始连接和游标与 extras 包一起使用。
与此同时,create_engine()
函数上的 use_batch_mode
标志成为可能(从 SqlAlchemy 1.2.0 开始)。参见docs。它使用 psycopg.extras
.
execute_batch()
函数