Multithreading makes me get the "ValueError: I/O operation on closed file" error. Why?

Multithreading makes me get the "ValueError: I/O operation on closed file" error. Why?

我正在使用 WTForms 编写 Flask Web 应用程序。在其中一种形式中,用户应上传 csv 文件,服务器将分析接收到的数据。这是我正在使用的代码。

filename = token_hex(8) + '.csv'  # Generate a new filename
form.dataset.data.save('myapp/datasets/' + filename)  # Save the received file
dataset = genfromtxt('myapp/datasets/' + filename, delimiter=',')  # Open the newly generated file
# analyze 'dataset' 

只要我在单线程应用程序中使用这段代码,一切都正常。我尝试在代码中添加一个线程。这是线程调用的过程(函数内的完全相同的代码):

def execute_analysis(form):
    filename = token_hex(8) + '.csv'  # Generate a new filename
    form.dataset.data.save('myapp/datasets/' + filename)  # Save the received file
    dataset = genfromtxt('myapp/datasets/' + filename, delimiter=',')  # Open the newly generated file
    # analyze 'dataset'

这是我调用线程的方式

import threading

@posts.route("/estimation", methods=['GET', 'POST'])
@login_required
def estimate_parameters():
    form = EstimateForm()
    if form.validate_on_submit():
        threading.Thread(target=execute_analysis, args=[form]).start()
        flash("Your request has been received. Please check the site in again in a few minutes.", category='success')
        # return render_template('posts/post.html', title=post.id, post=post)
    return render_template('estimations/estimator.html', title='New Analysis', form=form, legend='New Analysis')

但现在我得到以下错误:

ValueError: I/O operation on closed file.

相对于save函数调用。为什么它不起作用?我应该如何解决这个问题?

我对框架的了解还不足以准确说明发生了什么,但我可以告诉您如何修复它。

当你有一个被多个线程共享的资源时,使用锁。

from threading import Lock

LOCK = Lock()

def process():

    LOCK.acquire()
    ...  # open a file, write some data to it etc.
    LOCK.release()

    # alternatively, use the context manager syntax
    with LOCK:
        ...

threading.Thread(target=process).start()
threading.Thread(target=process).start()

Documentation on threading.Lock:

The class implementing primitive lock objects. Once a thread has acquired a lock, subsequent attempts to acquire it block, until it is released

基本上,在线程 1 调用 LOCK.acquire() 之后,后续调用例如来自其他线程,将导致这些线程冻结并等待调用 LOCK.release()(通常是线程 1,在它完成与资源的业务之后)。

如果文件名是随机生成的,那么我不认为 1 个线程关闭另一个线程会出现问题,除非它们碰巧生成相同的名称。但也许你可以通过一些实验来弄清楚,例如首先尝试锁定对 savegenfromtxt 的调用,然后检查是否有帮助。添加一些 print 语句(或者更好,使用 logging)也可能有意义,例如检查文件名是否不冲突。

如果没有进一步的上下文很难判断,但我怀疑您很可能是从函数返回或退出上下文管理器,这会导致某些文件描述符关闭,从而导致 save(..) 调用失败 ValueError.

如果是这样,一种直接的解决方法是等待线程在 returning/closing 文件之前完成。大致如下:

def handle_request(form):
  ...
  analyzer_thread = threading.Thread(target=execute_analysis, args=[form])
  analyzer_thread.start() 
  ...
  analyzer_thread.join() # wait for completion of execute_analysis
  cleanup_context(form)
  return

这是我描述的问题的一个可重现的最小示例:

import threading

SEM = threading.Semaphore(0)

def run(fd):
    SEM.acquire() # wait till release
    fd.write("This will fail :(")

fd = open("test.txt", "w+")
other_thread = threading.Thread(target=run, args=[fd])
other_thread.start()

fd.close()
SEM.release() # release the semaphore, so other_thread will acquire & proceed
other_thread.join()

请注意,主线程将关闭文件,而另一个线程将在 write 调用 ValueError: I/O operation on closed file. 时失败,就像您的情况一样。