SQLAlchemy 线程池执行器 "Too many clients"
SQLAlchemy ThreadPoolExecutor "Too many clients"
我用这种逻辑编写了一个脚本,以便在生成许多记录时将它们插入 PostgreSQL table。
#!/usr/bin/env python3
import asyncio
from concurrent.futures import ProcessPoolExecutor as pool
from functools import partial
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
metadata = sa.MetaData(schema='Whosebug')
Base = declarative_base(metadata=metadata)
class Example(Base):
__tablename__ = 'example'
pk = sa.Column(sa.Integer, primary_key=True)
text = sa.Column(sa.Text)
sa.event.listen(Base.metadata, 'before_create',
sa.DDL('CREATE SCHEMA IF NOT EXISTS Whosebug'))
engine = sa.create_engine(
'postgresql+psycopg2://postgres:password@localhost:5432/Whosebug'
)
Base.metadata.create_all(engine)
session = sa.orm.sessionmaker(bind=engine, autocommit=True)()
def task(value):
engine.dispose()
with session.begin():
session.add(Example(text=value))
async def infinite_task(loop):
spawn_task = partial(loop.run_in_executor, None, task)
while True:
await asyncio.wait([spawn_task(value) for value in range(10000)])
def main():
loop = asyncio.get_event_loop()
with pool() as executor:
loop.set_default_executor(executor)
asyncio.ensure_future(infinite_task(loop))
loop.run_forever()
loop.close()
if __name__ == '__main__':
main()
这段代码工作得很好,创建了一个进程池,进程数与我拥有的 CPU 个内核一样多,并且一直愉快地运行下去。我想看看线程如何与进程进行比较,但我找不到一个有效的例子。以下是我所做的更改:
from concurrent.futures import ThreadPoolExecutor as pool
session_maker = sa.orm.sessionmaker(bind=engine, autocommit=True)
Session = sa.orm.scoped_session(session_maker)
def task(value):
engine.dispose()
# create new session per thread
session = Session()
with session.begin():
session.add(Example(text=value))
# remove session once the work is done
Session.remove()
此版本运行一段时间后出现 "too many clients" 异常:
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL: sorry, too many clients already
我错过了什么?
看起来您打开了很多新连接但没有关闭它们,请尝试在之后添加 engine.dispose():
from concurrent.futures import ThreadPoolExecutor as pool
session_maker = sa.orm.sessionmaker(bind=engine, autocommit=True)
Session = sa.orm.scoped_session(session_maker)
def task(value):
engine.dispose()
# create new session per thread
session = Session()
with session.begin():
session.add(Example(text=value))
# remove session once the work is done
Session.remove()
engine.dispose()
请记住新连接的成本,因此理想情况下每个 process/thread 应该有一个连接,但我不确定 ThreadPoolExecutor 是如何工作的,并且可能连接没有在线程执行完成时关闭。
原来是engine.dispose()
的问题,用Mike Bayer(zzzeek)的话来说"is leaving PG connections lying open to be garbage collected."
来源:https://groups.google.com/forum/#!topic/sqlalchemy/zhjCBNebnDY
因此更新后的 task
函数如下所示:
def task(value):
# create new session per thread
session = Session()
with session.begin():
session.add(Example(text=value))
# remove session object once the work is done
session.remove()
我用这种逻辑编写了一个脚本,以便在生成许多记录时将它们插入 PostgreSQL table。
#!/usr/bin/env python3
import asyncio
from concurrent.futures import ProcessPoolExecutor as pool
from functools import partial
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
metadata = sa.MetaData(schema='Whosebug')
Base = declarative_base(metadata=metadata)
class Example(Base):
__tablename__ = 'example'
pk = sa.Column(sa.Integer, primary_key=True)
text = sa.Column(sa.Text)
sa.event.listen(Base.metadata, 'before_create',
sa.DDL('CREATE SCHEMA IF NOT EXISTS Whosebug'))
engine = sa.create_engine(
'postgresql+psycopg2://postgres:password@localhost:5432/Whosebug'
)
Base.metadata.create_all(engine)
session = sa.orm.sessionmaker(bind=engine, autocommit=True)()
def task(value):
engine.dispose()
with session.begin():
session.add(Example(text=value))
async def infinite_task(loop):
spawn_task = partial(loop.run_in_executor, None, task)
while True:
await asyncio.wait([spawn_task(value) for value in range(10000)])
def main():
loop = asyncio.get_event_loop()
with pool() as executor:
loop.set_default_executor(executor)
asyncio.ensure_future(infinite_task(loop))
loop.run_forever()
loop.close()
if __name__ == '__main__':
main()
这段代码工作得很好,创建了一个进程池,进程数与我拥有的 CPU 个内核一样多,并且一直愉快地运行下去。我想看看线程如何与进程进行比较,但我找不到一个有效的例子。以下是我所做的更改:
from concurrent.futures import ThreadPoolExecutor as pool
session_maker = sa.orm.sessionmaker(bind=engine, autocommit=True)
Session = sa.orm.scoped_session(session_maker)
def task(value):
engine.dispose()
# create new session per thread
session = Session()
with session.begin():
session.add(Example(text=value))
# remove session once the work is done
Session.remove()
此版本运行一段时间后出现 "too many clients" 异常:
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL: sorry, too many clients already
我错过了什么?
看起来您打开了很多新连接但没有关闭它们,请尝试在之后添加 engine.dispose():
from concurrent.futures import ThreadPoolExecutor as pool
session_maker = sa.orm.sessionmaker(bind=engine, autocommit=True)
Session = sa.orm.scoped_session(session_maker)
def task(value):
engine.dispose()
# create new session per thread
session = Session()
with session.begin():
session.add(Example(text=value))
# remove session once the work is done
Session.remove()
engine.dispose()
请记住新连接的成本,因此理想情况下每个 process/thread 应该有一个连接,但我不确定 ThreadPoolExecutor 是如何工作的,并且可能连接没有在线程执行完成时关闭。
原来是engine.dispose()
的问题,用Mike Bayer(zzzeek)的话来说"is leaving PG connections lying open to be garbage collected."
来源:https://groups.google.com/forum/#!topic/sqlalchemy/zhjCBNebnDY
因此更新后的 task
函数如下所示:
def task(value):
# create new session per thread
session = Session()
with session.begin():
session.add(Example(text=value))
# remove session object once the work is done
session.remove()