多个写入线程上的 SQLalchemy
Sqlalchemy on multiple write threads
以下代码在 Python 3.6+ 中有效,在 Python 3.4.3 中无效,不确定在哪个版本中失败。为什么是这样?我的印象是 sqlalchemy 可能会将多个 readers/writers 隐藏到基于文件的数据库中,方法是将其隐藏在调用的序列化程序后面。无论如何,这表明我没有正确处理此问题 - 如何在版本 < 3.6 中插入多个线程或主线程中的一个线程?
我在 sqlalchemy session()
级别尝试过
但只能让它与引擎一起工作,现在我只能在 3.6 上找到它。
def insert_inventory_table(conn, table, inventory):
conn.execute(table.insert(), inventory)
def results_table(conn, table):
q = select([table])
data = conn.execute(q).fetchall()
print('{!r}'.format(data))
def main_0():
engine = create_engine('sqlite://', connect_args={'check_same_thread' : False})
conn = engine.connect()
metadata = MetaData(engine)
table = Table('inventory',
metadata,
Column('item_no', Integer, primary_key=True, autoincrement=True),
Column('desc', String(255), nullable=False),
Column('volume', Integer, nullable=False)
)
metadata.create_all()
some_inventory = [{'item_no' : 0, 'desc' : 'drone', 'volume' : 12},
{'item_no' : 1, 'desc' : 'puddle jumper', 'volume' : 2},
{'item_no' : 2, 'desc' : 'pet monkey', 'volume' : 1},
{'item_no' : 3, 'desc' : 'bowling ball', 'volume' : 4},
{'item_no' : 4, 'desc' : 'electric guitar', 'volume' : 3},
{'item_no' : 5, 'desc' : 'bookends', 'volume' : 2}]
thread_0 = threading.Thread(target=insert_inventory_table, args=(conn, table, some_inventory[0:3]))
thread_1 = threading.Thread(target=insert_inventory_table, args=(conn, table, some_inventory[3:]))
thread_0.start()
thread_1.start()
return conn, table
if __name__ == '__main__':
conn,table = main_0()
results_table(conn, table)
谢谢。
如前所述,您必须使用作用域会话,因此不要使用 conn:
from sqlalchemy.orm import sessionmaker, scoped_session
db_session = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine))
然后每当你在一个线程中连接到你的数据库时,记住打开和关闭一个会话,你不希望会话被线程共享,因为这会导致不好的事情,所以:
def worker(sth):
session = db_session()
"do sth very important"
res = session.query(YourModel).filter(YourModel.sth = sth).first()
print(res)
session.close()
然后你就可以在你的多线程操作中使用这样的工作者了。希望对您有所帮助。
让我在上面的答案中添加一些内容(因为我无法再对其进行编辑)。使用作用域会话,您可以调用其他与数据库连接的函数,并且不必将会话作为参数添加到这些函数,因为在同一线程中创建的每个下一个会话都将是同一会话。所以:
def worker(sth):
session = db_session()
"do sth very important"
res = session.query(YourModel).filter(YourModel.sth == sth).first()
your_very_importan_function(sth)
print(res)
db_session.remove()
def your_very_important_function(sth):
session = db_session() # this session will be the same as the session created in the worker function, and in the worker function, it is different for every thread.
"do sth even more important"
res = session.query(YourOtherModel).filter(YourOtherModel.sth == sth).first()
最后,您应该删除会话,而不仅仅是关闭它。
https://docs.sqlalchemy.org/en/13/orm/contextual.html
以下代码在 Python 3.6+ 中有效,在 Python 3.4.3 中无效,不确定在哪个版本中失败。为什么是这样?我的印象是 sqlalchemy 可能会将多个 readers/writers 隐藏到基于文件的数据库中,方法是将其隐藏在调用的序列化程序后面。无论如何,这表明我没有正确处理此问题 - 如何在版本 < 3.6 中插入多个线程或主线程中的一个线程?
我在 sqlalchemy session()
级别尝试过
但只能让它与引擎一起工作,现在我只能在 3.6 上找到它。
def insert_inventory_table(conn, table, inventory):
conn.execute(table.insert(), inventory)
def results_table(conn, table):
q = select([table])
data = conn.execute(q).fetchall()
print('{!r}'.format(data))
def main_0():
engine = create_engine('sqlite://', connect_args={'check_same_thread' : False})
conn = engine.connect()
metadata = MetaData(engine)
table = Table('inventory',
metadata,
Column('item_no', Integer, primary_key=True, autoincrement=True),
Column('desc', String(255), nullable=False),
Column('volume', Integer, nullable=False)
)
metadata.create_all()
some_inventory = [{'item_no' : 0, 'desc' : 'drone', 'volume' : 12},
{'item_no' : 1, 'desc' : 'puddle jumper', 'volume' : 2},
{'item_no' : 2, 'desc' : 'pet monkey', 'volume' : 1},
{'item_no' : 3, 'desc' : 'bowling ball', 'volume' : 4},
{'item_no' : 4, 'desc' : 'electric guitar', 'volume' : 3},
{'item_no' : 5, 'desc' : 'bookends', 'volume' : 2}]
thread_0 = threading.Thread(target=insert_inventory_table, args=(conn, table, some_inventory[0:3]))
thread_1 = threading.Thread(target=insert_inventory_table, args=(conn, table, some_inventory[3:]))
thread_0.start()
thread_1.start()
return conn, table
if __name__ == '__main__':
conn,table = main_0()
results_table(conn, table)
谢谢。
如前所述,您必须使用作用域会话,因此不要使用 conn:
from sqlalchemy.orm import sessionmaker, scoped_session
db_session = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine))
然后每当你在一个线程中连接到你的数据库时,记住打开和关闭一个会话,你不希望会话被线程共享,因为这会导致不好的事情,所以:
def worker(sth):
session = db_session()
"do sth very important"
res = session.query(YourModel).filter(YourModel.sth = sth).first()
print(res)
session.close()
然后你就可以在你的多线程操作中使用这样的工作者了。希望对您有所帮助。
让我在上面的答案中添加一些内容(因为我无法再对其进行编辑)。使用作用域会话,您可以调用其他与数据库连接的函数,并且不必将会话作为参数添加到这些函数,因为在同一线程中创建的每个下一个会话都将是同一会话。所以:
def worker(sth):
session = db_session()
"do sth very important"
res = session.query(YourModel).filter(YourModel.sth == sth).first()
your_very_importan_function(sth)
print(res)
db_session.remove()
def your_very_important_function(sth):
session = db_session() # this session will be the same as the session created in the worker function, and in the worker function, it is different for every thread.
"do sth even more important"
res = session.query(YourOtherModel).filter(YourOtherModel.sth == sth).first()
最后,您应该删除会话,而不仅仅是关闭它。 https://docs.sqlalchemy.org/en/13/orm/contextual.html