Concurrent.futures 和 SQLAlchemy 基准测试与同步代码

Concurrent.futures and SQLAlchemy benchmarks vs. synchronous code

我有一个项目需要将大约 70 个文件上传到我的烧瓶应用程序。我现在正在学习并发,所以这似乎是完美的做法。使用 print 语句时,此函数的并发版本比同步函数快大约 2 到 2.5 倍。

虽然在实际写入 SQLite 数据库时,它花费的时间大致相同。

原函数:

@app.route('/test_sync')
def auto_add():

    t0 = time.time()

    # Code does not work without changing directory. better option?
    os.chdir('my_app/static/tracks')

    list_dir = os.listdir('my_app/static/tracks')

    # list_dir consists of .mp3 and .jpg files
    for filename in list_dir:
        if filename.endswith('.mp3'):
            try:
                thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
            except Exception:
                print(f'ERROR - COULD NOT FIND THUMB for { filename }')

            resize_image(thumbnail)

            with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:

                track = Track(

                    title=filename[15:-4], 
                    artist='Sam Gellaitry',
                    description='No desc.', 
                    thumbnail=t.read(),
                    binary_audio=f.read()
                )

        else:
            continue


        db.session.add(track)

    db.session.commit()
    elapsed = time.time() - t0

    return f'Uploaded all tracks in {elapsed} seconds.'

并发函数:

@app.route('/test_concurrent')
def auto_add_concurrent():

    t0 = time.time()
    MAX_WORKERS = 40

    os.chdir('/my_app/static/tracks')
    list_dir = os.listdir('/my_app/static/tracks')
    mp3_list = [x for x in list_dir if x.endswith('.mp3')]

    with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
        res = executor.map(add_one_file, mp3_list)

    for x in res:
        db.session.add(x)

    db.session.commit()
    elapsed = time.time() - t0

    return f'Uploaded all tracks in {elapsed} seconds.'

----- 

def add_one_file(filename):

    list_dir = os.listdir('/my_app/static/tracks')

    try:
        thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]

    except Exception:
        print(f'ERROR - COULD NOT FIND THUMB for { filename }')

    resize_image(thumbnail)

    with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:

        track = Track(

            title=filename[15:-4], 
            artist='Sam Gellaitry',
            description='No desc.', 
            thumbnail=t.read(),
            binary_audio=f.read()
        )

    return track

为了完整性,这里是 resize_image 函数

def resize_image(thumbnail):

    with Image.open(thumbnail) as img:
        img.resize((500, 500))
        img.save(thumbnail)

    return thumbnail

和基准:

/test_concurrent (with print statements)
Uploaded all tracks in 0.7054300308227539 seconds.

/test_sync
Uploaded all tracks in 1.8661110401153564 seconds.

------
/test_concurrent (with db.session.add/db.session.commit)
Uploaded all tracks in 5.303245782852173 seconds.

/test_sync 
Uploaded all tracks in 6.123792886734009 seconds.

这段并发代码我做错了什么,我该如何优化它?

似乎数据库写入支配了您的计时,并且在将许多行写入相同 table 或 SQLite 相同数据库的情况下,它们通常不会受益于并行化。不是将 ORM 对象 1 对 1 添加到会话中,而是执行批量插入:

db.session.bulk_save_objects(list(res))

在您当前的代码中,ORM 必须在提交之前的刷新期间一次插入一个 Track 对象,以便在插入后获取它们的主键。 Session.bulk_save_objects 默认情况下不会这样做,这意味着对象在之后不太可用——例如,它们没有添加到会话中——但这在你的情况下似乎不是问题。

"I’m inserting 400,000 rows with the ORM and it’s really slow!" 是一本很好的读物。


附带说明一下,在处理文件时,如果可能,最好尽量避免任何 TOCTOU 情况。换句话说,不要使用

thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]

要检查文件是否存在,如果必须,请使用 os.path.isfile() 或类似的代替,但您应该尝试打开它,然后处理错误,如果无法打开:

thumbnail = filename[:-4] + '.jpg'

try:
    resize_image(thumbnail)

except FileNotFoundError:
    print(f'ERROR - COULD NOT FIND THUMB for { filename }')
    # Note that the latter open attempt will fail as well, if this fails

...