多线程应用程序中的 SQLAlchemy - 超出锁定等待超时
SQLAlchemy in multithreaded application - Lock wait timeout exceeded
我在开发更新数据库的数据导入应用程序。就我而言,数据源是一个瓶颈,因此数据以多个线程的形式出现:
engine = create_engine(f"mysql+mysqldb://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_DB}")
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
def build_network():
...
for node_id in nodes:
thread = threading.Thread(
target=_build_page_nodes, args=(node_id, traversing_nodes, update),
name="Thread-" + node_id)
thread.start()
...
def _build_page_nodes(node_id, traversing_nodes, update):
session = Session()
...
logger.debug('Committing transaction')
session.commit()
Session.remove()
logger.debug('Done')
第一个线程执行正常。然而,下一个会导致问题。这是执行日志:
2021-05-05 09:52:02,846 INFO MainThread build_network 1 of 109015. 0 tasks are running
2021-05-05 09:52:04,811 DEBUG Thread-10000060 build_network Getting children for 10000060
2021-05-05 09:52:04,812 DEBUG Thread-10000060 build_network Committing transaction
2021-05-05 09:52:04,814 DEBUG Thread-10000060 build_network Done
2021-05-05 09:52:04,850 INFO MainThread build_network 2 of 109015. 0 tasks are running
2021-05-05 09:52:06,609 DEBUG Thread-10001280 build_network Getting children for 10001280
2021-05-05 09:52:06,610 DEBUG Thread-10001280 build_network Committing transaction
Exception in thread Thread-10001280:
Traceback (most recent call last):
File "/var/www/order/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
self.dialect.do_execute(
File "/var/www/order/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/var/www/order/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/var/www/order/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/var/www/order/lib/python3.8/site-packages/MySQLdb/connections.py", line 259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')
The above exception was the direct cause of the following exception:
# Skipped the stacktrace
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: UPDATE network_nodes SET built_tree=%s WHERE network_nodes.id = %s]
[parameters: (1, '10001280')]
正如我所看到的,第一个线程的执行没有问题。所以我假设它使一些连接处于活动状态。但是如您所见,我在线程完成后关闭并删除了会话。
目前没有其他客户端正在使用该数据库。此外,当应用程序是单线程时,它 运行 可以。那么我完成线程的方式有问题吗?
事实证明问题不在于线程本身,而在于 SQLAlchemy 会话。在开始生成线程之前,我收集了要处理的节点列表。在我获得节点列表后,我没有使用 Session.remove()
删除作用域会话。这导致交易保持 运行。使用 运行 select * from information_schema.innodb_trx;
我可以看到。
我还是不知道为什么第一次更新记录成功了,第二次才出现问题。但既然我能修好它,我就没事了。
我在开发更新数据库的数据导入应用程序。就我而言,数据源是一个瓶颈,因此数据以多个线程的形式出现:
engine = create_engine(f"mysql+mysqldb://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_DB}")
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
def build_network():
...
for node_id in nodes:
thread = threading.Thread(
target=_build_page_nodes, args=(node_id, traversing_nodes, update),
name="Thread-" + node_id)
thread.start()
...
def _build_page_nodes(node_id, traversing_nodes, update):
session = Session()
...
logger.debug('Committing transaction')
session.commit()
Session.remove()
logger.debug('Done')
第一个线程执行正常。然而,下一个会导致问题。这是执行日志:
2021-05-05 09:52:02,846 INFO MainThread build_network 1 of 109015. 0 tasks are running
2021-05-05 09:52:04,811 DEBUG Thread-10000060 build_network Getting children for 10000060
2021-05-05 09:52:04,812 DEBUG Thread-10000060 build_network Committing transaction
2021-05-05 09:52:04,814 DEBUG Thread-10000060 build_network Done
2021-05-05 09:52:04,850 INFO MainThread build_network 2 of 109015. 0 tasks are running
2021-05-05 09:52:06,609 DEBUG Thread-10001280 build_network Getting children for 10001280
2021-05-05 09:52:06,610 DEBUG Thread-10001280 build_network Committing transaction
Exception in thread Thread-10001280:
Traceback (most recent call last):
File "/var/www/order/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
self.dialect.do_execute(
File "/var/www/order/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/var/www/order/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/var/www/order/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/var/www/order/lib/python3.8/site-packages/MySQLdb/connections.py", line 259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')
The above exception was the direct cause of the following exception:
# Skipped the stacktrace
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: UPDATE network_nodes SET built_tree=%s WHERE network_nodes.id = %s]
[parameters: (1, '10001280')]
正如我所看到的,第一个线程的执行没有问题。所以我假设它使一些连接处于活动状态。但是如您所见,我在线程完成后关闭并删除了会话。
目前没有其他客户端正在使用该数据库。此外,当应用程序是单线程时,它 运行 可以。那么我完成线程的方式有问题吗?
事实证明问题不在于线程本身,而在于 SQLAlchemy 会话。在开始生成线程之前,我收集了要处理的节点列表。在我获得节点列表后,我没有使用 Session.remove()
删除作用域会话。这导致交易保持 运行。使用 运行 select * from information_schema.innodb_trx;
我可以看到。
我还是不知道为什么第一次更新记录成功了,第二次才出现问题。但既然我能修好它,我就没事了。