Python 多处理池在映射调用时挂起
Python multiprocessing pool hangs on map call
我有一个函数可以使用 SQLAlchemy 解析文件并将数据插入 MySQL。我已经 运行 按 os.listdir()
的结果顺序执行函数,一切正常。
因为大部分时间都花在读取文件和写入数据库上,所以我想使用多处理来加快速度。这是我的伪代码,因为实际代码太长了:
def parse_file(filename):
f = open(filename, 'rb')
data = f.read()
f.close()
soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')
# parse file here
db_record = MyDBRecord(parsed_data)
session.add(db_record)
session.commit()
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
我遇到的问题是脚本挂起并且永远不会完成。我通常将 63 条记录中的 48 条存入数据库。有时多,有时少。
我试过使用 pool.close()
并与 pool.join()
结合使用,但似乎都无济于事。
如何完成此脚本?我究竟做错了什么?我在 Linux 盒子上使用 Python 2.7.8。
您需要将所有使用多处理的代码放在它自己的函数中。当 multiprocessing 在单独的进程中重新导入您的模块时,这会阻止它递归地启动新池:
def parse_file(filename):
...
def main():
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
if __name__ == '__main__:
main()
请参阅有关 making sure your module is importable, also the advice for running on Windows(tm)
的文档
问题是两件事的结合:
- 我的池代码被多次调用(感谢@Peter Wood)
- 我的数据库代码打开太多会话(and/or)共享会话
我进行了以下更改,现在一切正常:
原始文件
def parse_file(filename):
f = open(filename, 'rb')
data = f.read()
f.close()
soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')
# parse file here
db_record = MyDBRecord(parsed_data)
session = get_session() # see below
session.add(db_record)
session.commit()
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
数据库文件
def get_session():
engine = create_engine('mysql://root:root@localhost/my_db')
Base.metadata.create_all(engine)
Base.metadata.bind = engine
db_session = sessionmaker(bind=engine)
return db_session()
我有一个函数可以使用 SQLAlchemy 解析文件并将数据插入 MySQL。我已经 运行 按 os.listdir()
的结果顺序执行函数,一切正常。
因为大部分时间都花在读取文件和写入数据库上,所以我想使用多处理来加快速度。这是我的伪代码,因为实际代码太长了:
def parse_file(filename):
f = open(filename, 'rb')
data = f.read()
f.close()
soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')
# parse file here
db_record = MyDBRecord(parsed_data)
session.add(db_record)
session.commit()
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
我遇到的问题是脚本挂起并且永远不会完成。我通常将 63 条记录中的 48 条存入数据库。有时多,有时少。
我试过使用 pool.close()
并与 pool.join()
结合使用,但似乎都无济于事。
如何完成此脚本?我究竟做错了什么?我在 Linux 盒子上使用 Python 2.7.8。
您需要将所有使用多处理的代码放在它自己的函数中。当 multiprocessing 在单独的进程中重新导入您的模块时,这会阻止它递归地启动新池:
def parse_file(filename):
...
def main():
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
if __name__ == '__main__:
main()
请参阅有关 making sure your module is importable, also the advice for running on Windows(tm)
的文档问题是两件事的结合:
- 我的池代码被多次调用(感谢@Peter Wood)
- 我的数据库代码打开太多会话(and/or)共享会话
我进行了以下更改,现在一切正常: 原始文件
def parse_file(filename):
f = open(filename, 'rb')
data = f.read()
f.close()
soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')
# parse file here
db_record = MyDBRecord(parsed_data)
session = get_session() # see below
session.add(db_record)
session.commit()
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
数据库文件
def get_session():
engine = create_engine('mysql://root:root@localhost/my_db')
Base.metadata.create_all(engine)
Base.metadata.bind = engine
db_session = sessionmaker(bind=engine)
return db_session()