Concurrent.futures 和 SQLAlchemy 基准测试与同步代码
Concurrent.futures and SQLAlchemy benchmarks vs. synchronous code
我有一个项目需要将大约 70 个文件上传到我的烧瓶应用程序。我现在正在学习并发,所以这似乎是完美的做法。使用 print 语句时,此函数的并发版本比同步函数快大约 2 到 2.5 倍。
虽然在实际写入 SQLite 数据库时,它花费的时间大致相同。
原函数:
@app.route('/test_sync')
def auto_add():
t0 = time.time()
# Code does not work without changing directory. better option?
os.chdir('my_app/static/tracks')
list_dir = os.listdir('my_app/static/tracks')
# list_dir consists of .mp3 and .jpg files
for filename in list_dir:
if filename.endswith('.mp3'):
try:
thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
except Exception:
print(f'ERROR - COULD NOT FIND THUMB for { filename }')
resize_image(thumbnail)
with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:
track = Track(
title=filename[15:-4],
artist='Sam Gellaitry',
description='No desc.',
thumbnail=t.read(),
binary_audio=f.read()
)
else:
continue
db.session.add(track)
db.session.commit()
elapsed = time.time() - t0
return f'Uploaded all tracks in {elapsed} seconds.'
并发函数:
@app.route('/test_concurrent')
def auto_add_concurrent():
t0 = time.time()
MAX_WORKERS = 40
os.chdir('/my_app/static/tracks')
list_dir = os.listdir('/my_app/static/tracks')
mp3_list = [x for x in list_dir if x.endswith('.mp3')]
with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
res = executor.map(add_one_file, mp3_list)
for x in res:
db.session.add(x)
db.session.commit()
elapsed = time.time() - t0
return f'Uploaded all tracks in {elapsed} seconds.'
-----
def add_one_file(filename):
list_dir = os.listdir('/my_app/static/tracks')
try:
thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
except Exception:
print(f'ERROR - COULD NOT FIND THUMB for { filename }')
resize_image(thumbnail)
with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:
track = Track(
title=filename[15:-4],
artist='Sam Gellaitry',
description='No desc.',
thumbnail=t.read(),
binary_audio=f.read()
)
return track
为了完整性,这里是 resize_image 函数
def resize_image(thumbnail):
with Image.open(thumbnail) as img:
img.resize((500, 500))
img.save(thumbnail)
return thumbnail
和基准:
/test_concurrent (with print statements)
Uploaded all tracks in 0.7054300308227539 seconds.
/test_sync
Uploaded all tracks in 1.8661110401153564 seconds.
------
/test_concurrent (with db.session.add/db.session.commit)
Uploaded all tracks in 5.303245782852173 seconds.
/test_sync
Uploaded all tracks in 6.123792886734009 seconds.
这段并发代码我做错了什么,我该如何优化它?
似乎数据库写入支配了您的计时,并且在将许多行写入相同 table 或 SQLite 相同数据库的情况下,它们通常不会受益于并行化。不是将 ORM 对象 1 对 1 添加到会话中,而是执行批量插入:
db.session.bulk_save_objects(list(res))
在您当前的代码中,ORM 必须在提交之前的刷新期间一次插入一个 Track
对象,以便在插入后获取它们的主键。 Session.bulk_save_objects
默认情况下不会这样做,这意味着对象在之后不太可用——例如,它们没有添加到会话中——但这在你的情况下似乎不是问题。
"I’m inserting 400,000 rows with the ORM and it’s really slow!" 是一本很好的读物。
附带说明一下,在处理文件时,如果可能,最好尽量避免任何 TOCTOU 情况。换句话说,不要使用
thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
要检查文件是否存在,如果必须,请使用 os.path.isfile()
或类似的代替,但您应该尝试打开它,然后处理错误,如果无法打开:
thumbnail = filename[:-4] + '.jpg'
try:
resize_image(thumbnail)
except FileNotFoundError:
print(f'ERROR - COULD NOT FIND THUMB for { filename }')
# Note that the latter open attempt will fail as well, if this fails
...
我有一个项目需要将大约 70 个文件上传到我的烧瓶应用程序。我现在正在学习并发,所以这似乎是完美的做法。使用 print 语句时,此函数的并发版本比同步函数快大约 2 到 2.5 倍。
虽然在实际写入 SQLite 数据库时,它花费的时间大致相同。
原函数:
@app.route('/test_sync')
def auto_add():
t0 = time.time()
# Code does not work without changing directory. better option?
os.chdir('my_app/static/tracks')
list_dir = os.listdir('my_app/static/tracks')
# list_dir consists of .mp3 and .jpg files
for filename in list_dir:
if filename.endswith('.mp3'):
try:
thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
except Exception:
print(f'ERROR - COULD NOT FIND THUMB for { filename }')
resize_image(thumbnail)
with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:
track = Track(
title=filename[15:-4],
artist='Sam Gellaitry',
description='No desc.',
thumbnail=t.read(),
binary_audio=f.read()
)
else:
continue
db.session.add(track)
db.session.commit()
elapsed = time.time() - t0
return f'Uploaded all tracks in {elapsed} seconds.'
并发函数:
@app.route('/test_concurrent')
def auto_add_concurrent():
t0 = time.time()
MAX_WORKERS = 40
os.chdir('/my_app/static/tracks')
list_dir = os.listdir('/my_app/static/tracks')
mp3_list = [x for x in list_dir if x.endswith('.mp3')]
with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
res = executor.map(add_one_file, mp3_list)
for x in res:
db.session.add(x)
db.session.commit()
elapsed = time.time() - t0
return f'Uploaded all tracks in {elapsed} seconds.'
-----
def add_one_file(filename):
list_dir = os.listdir('/my_app/static/tracks')
try:
thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
except Exception:
print(f'ERROR - COULD NOT FIND THUMB for { filename }')
resize_image(thumbnail)
with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:
track = Track(
title=filename[15:-4],
artist='Sam Gellaitry',
description='No desc.',
thumbnail=t.read(),
binary_audio=f.read()
)
return track
为了完整性,这里是 resize_image 函数
def resize_image(thumbnail):
with Image.open(thumbnail) as img:
img.resize((500, 500))
img.save(thumbnail)
return thumbnail
和基准:
/test_concurrent (with print statements)
Uploaded all tracks in 0.7054300308227539 seconds.
/test_sync
Uploaded all tracks in 1.8661110401153564 seconds.
------
/test_concurrent (with db.session.add/db.session.commit)
Uploaded all tracks in 5.303245782852173 seconds.
/test_sync
Uploaded all tracks in 6.123792886734009 seconds.
这段并发代码我做错了什么,我该如何优化它?
似乎数据库写入支配了您的计时,并且在将许多行写入相同 table 或 SQLite 相同数据库的情况下,它们通常不会受益于并行化。不是将 ORM 对象 1 对 1 添加到会话中,而是执行批量插入:
db.session.bulk_save_objects(list(res))
在您当前的代码中,ORM 必须在提交之前的刷新期间一次插入一个 Track
对象,以便在插入后获取它们的主键。 Session.bulk_save_objects
默认情况下不会这样做,这意味着对象在之后不太可用——例如,它们没有添加到会话中——但这在你的情况下似乎不是问题。
"I’m inserting 400,000 rows with the ORM and it’s really slow!" 是一本很好的读物。
附带说明一下,在处理文件时,如果可能,最好尽量避免任何 TOCTOU 情况。换句话说,不要使用
thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
要检查文件是否存在,如果必须,请使用 os.path.isfile()
或类似的代替,但您应该尝试打开它,然后处理错误,如果无法打开:
thumbnail = filename[:-4] + '.jpg'
try:
resize_image(thumbnail)
except FileNotFoundError:
print(f'ERROR - COULD NOT FIND THUMB for { filename }')
# Note that the latter open attempt will fail as well, if this fails
...