使用 Flask-SQLAlchemy 批量插入
Bulk inserts with Flask-SQLAlchemy
我正在使用 Flask-SQLAlchemy 执行相当大的 60k 行批量插入。我在这个table上也是多对多的关系,所以我不能用db.engine.execute
。在插入之前,我需要在数据库中找到类似的项目,如果发现重复的项目,则将插入更改为更新。
我可以事先做这个检查,然后通过 db.engine.execute
进行批量插入,但我需要插入时行的主键。
目前,我在每次插入时执行 db.session.add()
和 db.session.commit()
,我每秒只能插入 3-4 次。
我 运行 一个分析器来查看瓶颈在哪里,db.session.commit()
似乎占用了 60% 的时间。
有没有什么方法可以让我更快地执行此操作,也许是通过对提交进行分组,但哪种方法可以返回主键?
这是我的模型的样子:
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(1024), nullable=True)
created = db.Column(db.DateTime())
tags_relationship = db.relationship('Tag', secondary=tags, backref=db.backref('items', lazy='dynamic'))
tags = association_proxy('tags_relationship', 'text')
class Tag(db.Model):
id = db.Column(db.Integer, primary_key=True)
text = db.Column(db.String(255))
我的插入操作是:
for item in items:
if duplicate:
update_existing_item
else:
x = Item()
x.title = "string"
x.created = datetime.datetime.utcnow()
for tag in tags:
if not tag_already_exists:
y = Tag()
y.text = "tagtext"
x.tags_relationship.append(y)
db.session.add(y)
db.session.commit()
else:
x.tags_relationship.append(existing_tag)
db.session.add(x)
db.session.commit()
也许您应该尝试db.session.flush()
将数据发送到服务器,这意味着将生成任何主键。最后,您可以 db.session.commit()
实际提交交易。
我使用下面的代码快速读取一个pandas DataFrame 的内容到SQLite 中。请注意,它绕过了 SQLAlchemy 的 ORM 功能。 myClass 在这种情况下是一个 db.Model 派生的 class,它有一个分配给它的表名。正如代码片段所提到的,我改编了
l = df.to_dict('records')
# bulk save the dictionaries, circumventing the slow ORM interface
# c.f. https://gist.github.com/shrayasr/5df96d5bc287f3a2faa4
connection.engine.execute(
myClass.__table__.insert(),
l
)
我正在使用 Flask-SQLAlchemy 执行相当大的 60k 行批量插入。我在这个table上也是多对多的关系,所以我不能用db.engine.execute
。在插入之前,我需要在数据库中找到类似的项目,如果发现重复的项目,则将插入更改为更新。
我可以事先做这个检查,然后通过 db.engine.execute
进行批量插入,但我需要插入时行的主键。
目前,我在每次插入时执行 db.session.add()
和 db.session.commit()
,我每秒只能插入 3-4 次。
我 运行 一个分析器来查看瓶颈在哪里,db.session.commit()
似乎占用了 60% 的时间。
有没有什么方法可以让我更快地执行此操作,也许是通过对提交进行分组,但哪种方法可以返回主键?
这是我的模型的样子:
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(1024), nullable=True)
created = db.Column(db.DateTime())
tags_relationship = db.relationship('Tag', secondary=tags, backref=db.backref('items', lazy='dynamic'))
tags = association_proxy('tags_relationship', 'text')
class Tag(db.Model):
id = db.Column(db.Integer, primary_key=True)
text = db.Column(db.String(255))
我的插入操作是:
for item in items:
if duplicate:
update_existing_item
else:
x = Item()
x.title = "string"
x.created = datetime.datetime.utcnow()
for tag in tags:
if not tag_already_exists:
y = Tag()
y.text = "tagtext"
x.tags_relationship.append(y)
db.session.add(y)
db.session.commit()
else:
x.tags_relationship.append(existing_tag)
db.session.add(x)
db.session.commit()
也许您应该尝试db.session.flush()
将数据发送到服务器,这意味着将生成任何主键。最后,您可以 db.session.commit()
实际提交交易。
我使用下面的代码快速读取一个pandas DataFrame 的内容到SQLite 中。请注意,它绕过了 SQLAlchemy 的 ORM 功能。 myClass 在这种情况下是一个 db.Model 派生的 class,它有一个分配给它的表名。正如代码片段所提到的,我改编了
l = df.to_dict('records')
# bulk save the dictionaries, circumventing the slow ORM interface
# c.f. https://gist.github.com/shrayasr/5df96d5bc287f3a2faa4
connection.engine.execute(
myClass.__table__.insert(),
l
)