多线程应用程序中的 SQLAlchemy - 超出锁定等待超时

SQLAlchemy in multithreaded application - Lock wait timeout exceeded

我在开发更新数据库的数据导入应用程序。就我而言,数据源是一个瓶颈,因此数据以多个线程的形式出现:

engine = create_engine(f"mysql+mysqldb://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_DB}")
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)

def build_network():
...
    for node_id in nodes:
        thread = threading.Thread(
            target=_build_page_nodes, args=(node_id, traversing_nodes, update),
            name="Thread-" + node_id)
        thread.start()
...

def _build_page_nodes(node_id, traversing_nodes, update):
    session = Session()
...
    logger.debug('Committing transaction')
    session.commit()
    Session.remove()
    logger.debug('Done')

第一个线程执行正常。然而,下一个会导致问题。这是执行日志:

2021-05-05 09:52:02,846 INFO    MainThread      build_network   1 of 109015. 0 tasks are running
2021-05-05 09:52:04,811 DEBUG   Thread-10000060 build_network   Getting children for 10000060
2021-05-05 09:52:04,812 DEBUG   Thread-10000060 build_network   Committing transaction
2021-05-05 09:52:04,814 DEBUG   Thread-10000060 build_network   Done
2021-05-05 09:52:04,850 INFO    MainThread      build_network   2 of 109015. 0 tasks are running
2021-05-05 09:52:06,609 DEBUG   Thread-10001280 build_network   Getting children for 10001280
2021-05-05 09:52:06,610 DEBUG   Thread-10001280 build_network   Committing transaction
Exception in thread Thread-10001280:
Traceback (most recent call last):
  File "/var/www/order/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    self.dialect.do_execute(
  File "/var/www/order/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/var/www/order/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/var/www/order/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/var/www/order/lib/python3.8/site-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')

The above exception was the direct cause of the following exception:

# Skipped the stacktrace
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: UPDATE network_nodes SET built_tree=%s WHERE network_nodes.id = %s]
[parameters: (1, '10001280')]

正如我所看到的,第一个线程的执行没有问题。所以我假设它使一些连接处于活动状态。但是如您所见,我在线程完成后关闭并删除了会话。

目前没有其他客户端正在使用该数据库。此外,当应用程序是单线程时,它 运行 可以。那么我完成线程的方式有问题吗?

事实证明问题不在于线程本身,而在于 SQLAlchemy 会话。在开始生成线程之前,我收集了要处理的节点列表。在我获得节点列表后,我没有使用 Session.remove() 删除作用域会话。这导致交易保持 运行。使用 运行 select * from information_schema.innodb_trx; 我可以看到。

我还是不知道为什么第一次更新记录成功了,第二次才出现问题。但既然我能修好它,我就没事了。