Multithreading makes me get the "ValueError: I/O operation on closed file" error. Why?
Multithreading makes me get the "ValueError: I/O operation on closed file" error. Why?
我正在使用 WTForms 编写 Flask Web 应用程序。在其中一种形式中,用户应上传 csv 文件,服务器将分析接收到的数据。这是我正在使用的代码。
filename = token_hex(8) + '.csv' # Generate a new filename
form.dataset.data.save('myapp/datasets/' + filename) # Save the received file
dataset = genfromtxt('myapp/datasets/' + filename, delimiter=',') # Open the newly generated file
# analyze 'dataset'
只要我在单线程应用程序中使用这段代码,一切都正常。我尝试在代码中添加一个线程。这是线程调用的过程(函数内的完全相同的代码):
def execute_analysis(form):
filename = token_hex(8) + '.csv' # Generate a new filename
form.dataset.data.save('myapp/datasets/' + filename) # Save the received file
dataset = genfromtxt('myapp/datasets/' + filename, delimiter=',') # Open the newly generated file
# analyze 'dataset'
这是我调用线程的方式
import threading
@posts.route("/estimation", methods=['GET', 'POST'])
@login_required
def estimate_parameters():
form = EstimateForm()
if form.validate_on_submit():
threading.Thread(target=execute_analysis, args=[form]).start()
flash("Your request has been received. Please check the site in again in a few minutes.", category='success')
# return render_template('posts/post.html', title=post.id, post=post)
return render_template('estimations/estimator.html', title='New Analysis', form=form, legend='New Analysis')
但现在我得到以下错误:
ValueError: I/O operation on closed file.
相对于save
函数调用。为什么它不起作用?我应该如何解决这个问题?
我对框架的了解还不足以准确说明发生了什么,但我可以告诉您如何修复它。
当你有一个被多个线程共享的资源时,使用锁。
from threading import Lock
LOCK = Lock()
def process():
LOCK.acquire()
... # open a file, write some data to it etc.
LOCK.release()
# alternatively, use the context manager syntax
with LOCK:
...
threading.Thread(target=process).start()
threading.Thread(target=process).start()
Documentation on threading.Lock
:
The class implementing primitive lock objects. Once a thread has acquired a lock, subsequent attempts to acquire it block, until it is released
基本上,在线程 1 调用 LOCK.acquire()
之后,后续调用例如来自其他线程,将导致这些线程冻结并等待调用 LOCK.release()
(通常是线程 1,在它完成与资源的业务之后)。
如果文件名是随机生成的,那么我不认为 1 个线程关闭另一个线程会出现问题,除非它们碰巧生成相同的名称。但也许你可以通过一些实验来弄清楚,例如首先尝试锁定对 save
和 genfromtxt
的调用,然后检查是否有帮助。添加一些 print
语句(或者更好,使用 logging
)也可能有意义,例如检查文件名是否不冲突。
如果没有进一步的上下文很难判断,但我怀疑您很可能是从函数返回或退出上下文管理器,这会导致某些文件描述符关闭,从而导致 save(..)
调用失败 ValueError
.
如果是这样,一种直接的解决方法是等待线程在 returning/closing 文件之前完成。大致如下:
def handle_request(form):
...
analyzer_thread = threading.Thread(target=execute_analysis, args=[form])
analyzer_thread.start()
...
analyzer_thread.join() # wait for completion of execute_analysis
cleanup_context(form)
return
这是我描述的问题的一个可重现的最小示例:
import threading
SEM = threading.Semaphore(0)
def run(fd):
SEM.acquire() # wait till release
fd.write("This will fail :(")
fd = open("test.txt", "w+")
other_thread = threading.Thread(target=run, args=[fd])
other_thread.start()
fd.close()
SEM.release() # release the semaphore, so other_thread will acquire & proceed
other_thread.join()
请注意,主线程将关闭文件,而另一个线程将在 write
调用 ValueError: I/O operation on closed file.
时失败,就像您的情况一样。
我正在使用 WTForms 编写 Flask Web 应用程序。在其中一种形式中,用户应上传 csv 文件,服务器将分析接收到的数据。这是我正在使用的代码。
filename = token_hex(8) + '.csv' # Generate a new filename
form.dataset.data.save('myapp/datasets/' + filename) # Save the received file
dataset = genfromtxt('myapp/datasets/' + filename, delimiter=',') # Open the newly generated file
# analyze 'dataset'
只要我在单线程应用程序中使用这段代码,一切都正常。我尝试在代码中添加一个线程。这是线程调用的过程(函数内的完全相同的代码):
def execute_analysis(form):
filename = token_hex(8) + '.csv' # Generate a new filename
form.dataset.data.save('myapp/datasets/' + filename) # Save the received file
dataset = genfromtxt('myapp/datasets/' + filename, delimiter=',') # Open the newly generated file
# analyze 'dataset'
这是我调用线程的方式
import threading
@posts.route("/estimation", methods=['GET', 'POST'])
@login_required
def estimate_parameters():
form = EstimateForm()
if form.validate_on_submit():
threading.Thread(target=execute_analysis, args=[form]).start()
flash("Your request has been received. Please check the site in again in a few minutes.", category='success')
# return render_template('posts/post.html', title=post.id, post=post)
return render_template('estimations/estimator.html', title='New Analysis', form=form, legend='New Analysis')
但现在我得到以下错误:
ValueError: I/O operation on closed file.
相对于save
函数调用。为什么它不起作用?我应该如何解决这个问题?
我对框架的了解还不足以准确说明发生了什么,但我可以告诉您如何修复它。
当你有一个被多个线程共享的资源时,使用锁。
from threading import Lock
LOCK = Lock()
def process():
LOCK.acquire()
... # open a file, write some data to it etc.
LOCK.release()
# alternatively, use the context manager syntax
with LOCK:
...
threading.Thread(target=process).start()
threading.Thread(target=process).start()
Documentation on threading.Lock
:
The class implementing primitive lock objects. Once a thread has acquired a lock, subsequent attempts to acquire it block, until it is released
基本上,在线程 1 调用 LOCK.acquire()
之后,后续调用例如来自其他线程,将导致这些线程冻结并等待调用 LOCK.release()
(通常是线程 1,在它完成与资源的业务之后)。
如果文件名是随机生成的,那么我不认为 1 个线程关闭另一个线程会出现问题,除非它们碰巧生成相同的名称。但也许你可以通过一些实验来弄清楚,例如首先尝试锁定对 save
和 genfromtxt
的调用,然后检查是否有帮助。添加一些 print
语句(或者更好,使用 logging
)也可能有意义,例如检查文件名是否不冲突。
如果没有进一步的上下文很难判断,但我怀疑您很可能是从函数返回或退出上下文管理器,这会导致某些文件描述符关闭,从而导致 save(..)
调用失败 ValueError
.
如果是这样,一种直接的解决方法是等待线程在 returning/closing 文件之前完成。大致如下:
def handle_request(form):
...
analyzer_thread = threading.Thread(target=execute_analysis, args=[form])
analyzer_thread.start()
...
analyzer_thread.join() # wait for completion of execute_analysis
cleanup_context(form)
return
这是我描述的问题的一个可重现的最小示例:
import threading
SEM = threading.Semaphore(0)
def run(fd):
SEM.acquire() # wait till release
fd.write("This will fail :(")
fd = open("test.txt", "w+")
other_thread = threading.Thread(target=run, args=[fd])
other_thread.start()
fd.close()
SEM.release() # release the semaphore, so other_thread will acquire & proceed
other_thread.join()
请注意,主线程将关闭文件,而另一个线程将在 write
调用 ValueError: I/O operation on closed file.
时失败,就像您的情况一样。