在线程中使用 Popen 会阻塞每个传入的 Flask-SocketIO 请求
Using Popen in a thread blocks every incoming Flask-SocketIO request
我遇到以下情况:
我在 socketio 服务器上收到一个请求。我回答它 (socket.emit(..)) 然后 然后 在另一个线程 .[=27= 中开始一些计算负载很大的东西 ]
如果繁重的计算是由 subprocess.Popen
(使用 subprocess.PIPE
)引起的,它会完全阻止每个正在执行的传入请求,尽管它发生在一个单独的线程中。
没问题 - 在 this thread 中建议异步读取缓冲区大小为 1 的子进程的结果,以便在这些读取之间其他线程有机会做一些事情。不幸的是,这对我没有帮助。
我也已经 eventlet 并且工作正常 - 只要我不在线程中使用 subprocess.Popen
和 subprocess.PIPE
。
在此代码示例中,您可以看到它仅在使用 subprocess.Popen
和 subprocess.PIPE
时发生。当取消注释 #functionWithSimulatedHeavyLoad()
并改为注释 functionWithHeavyLoad()
时,一切都像魅力一样。
from flask import Flask
from flask.ext.socketio import SocketIO, emit
import eventlet
eventlet.monkey_patch()
app = Flask(__name__)
socketio = SocketIO(app)
import time
from threading import Thread
@socketio.on('client command')
def response(data, type = None, nonce = None):
socketio.emit('client response', ['foo'])
thread = Thread(target = testThreadFunction)
thread.daemon = True
thread.start()
def testThreadFunction():
#functionWithSimulatedHeavyLoad()
functionWithHeavyLoad()
def functionWithSimulatedHeavyLoad():
time.sleep(5)
def functionWithHeavyLoad():
from datetime import datetime
import subprocess
import sys
from queue import Queue, Empty
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueueOutput(out, queue):
for line in iter(out.readline, b''):
if line == '':
break
queue.put(line)
out.close()
# just anything that takes long to be computed
shellCommand = 'find / test'
p = subprocess.Popen(shellCommand, universal_newlines=True, shell=True, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target = enqueueOutput, args = (p.stdout, q))
t.daemon = True
t.start()
t.join()
text = ''
while True:
try:
line = q.get_nowait()
text += line
print(line)
except Empty:
break
socketio.emit('client response', {'text': text})
socketio.run(app)
客户端在functionWithHeavyLoad()函数中的阻塞工作完成后收到消息'foo'。不过它应该更早收到消息。
可以将此示例复制并粘贴到 .py 文件中,并且可以立即重现该行为。
我正在使用 Python 3.4.3,Flask 0.10.1,flask-socketio1.2,eventlet 0.17.4
更新
如果我将它放入 functionWithHeavyLoad 函数中,它实际上可以工作并且一切正常:
import shlex
shellCommand = shlex.split('find / test')
popen = subprocess.Popen(shellCommand, stdout=subprocess.PIPE)
lines_iterator = iter(popen.stdout.readline, b"")
for line in lines_iterator:
print(line)
eventlet.sleep()
问题是:我使用 find
重载是为了让您的示例更容易重现。但是,在我的代码中,我实际上使用 tesseract "{0}" stdout -l deu
作为卖出命令。这(与 find
不同)仍然会阻止所有内容。这是一个 tesseract
问题而不是 eventlet?但是仍然:如果它发生在一个单独的线程中,当 find
不阻塞时,它会随着上下文切换逐行读取,怎么会阻塞?
感谢这个问题,我今天学到了一些新东西。 Eventlet 确实提供了 subprocess 及其功能的 greenlet 友好版本,但由于某些奇怪的原因,它没有在标准库中修补此模块。
Link 到子流程的 eventlet 实现:https://github.com/eventlet/eventlet/blob/master/eventlet/green/subprocess.py
查看eventletpatcher,打补丁的模块有os、select、socket、thread、time、MySQLdb、builtins和psycopg2。补丁程序中绝对没有对子进程的引用。
好消息是我能够在与您的应用程序非常相似的应用程序中使用 Popen()
,在我替换之后:
import subprocess
与:
from eventlet.green import subprocess
但注意目前发布的eventlet版本(0.17.4)不支持Popen
中的universal_newlines
选项,使用会报错。 master 中支持此选项(这里是添加该选项的 commit)。您要么必须从您的呼叫中删除该选项,要么直接从 github.
安装 eventlet 的主分支
我遇到以下情况:
我在 socketio 服务器上收到一个请求。我回答它 (socket.emit(..)) 然后 然后 在另一个线程 .[=27= 中开始一些计算负载很大的东西 ]
如果繁重的计算是由 没问题 - 在 this thread 中建议异步读取缓冲区大小为 1 的子进程的结果,以便在这些读取之间其他线程有机会做一些事情。不幸的是,这对我没有帮助。 我也已经 在此代码示例中,您可以看到它仅在使用 客户端在functionWithHeavyLoad()函数中的阻塞工作完成后收到消息'foo'。不过它应该更早收到消息。 可以将此示例复制并粘贴到 .py 文件中,并且可以立即重现该行为。 我正在使用 Python 3.4.3,Flask 0.10.1,flask-socketio1.2,eventlet 0.17.4 更新 如果我将它放入 functionWithHeavyLoad 函数中,它实际上可以工作并且一切正常: 问题是:我使用 subprocess.Popen
(使用 subprocess.PIPE
)引起的,它会完全阻止每个正在执行的传入请求,尽管它发生在一个单独的线程中。subprocess.Popen
和 subprocess.PIPE
。subprocess.Popen
和 subprocess.PIPE
时发生。当取消注释 #functionWithSimulatedHeavyLoad()
并改为注释 functionWithHeavyLoad()
时,一切都像魅力一样。from flask import Flask
from flask.ext.socketio import SocketIO, emit
import eventlet
eventlet.monkey_patch()
app = Flask(__name__)
socketio = SocketIO(app)
import time
from threading import Thread
@socketio.on('client command')
def response(data, type = None, nonce = None):
socketio.emit('client response', ['foo'])
thread = Thread(target = testThreadFunction)
thread.daemon = True
thread.start()
def testThreadFunction():
#functionWithSimulatedHeavyLoad()
functionWithHeavyLoad()
def functionWithSimulatedHeavyLoad():
time.sleep(5)
def functionWithHeavyLoad():
from datetime import datetime
import subprocess
import sys
from queue import Queue, Empty
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueueOutput(out, queue):
for line in iter(out.readline, b''):
if line == '':
break
queue.put(line)
out.close()
# just anything that takes long to be computed
shellCommand = 'find / test'
p = subprocess.Popen(shellCommand, universal_newlines=True, shell=True, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target = enqueueOutput, args = (p.stdout, q))
t.daemon = True
t.start()
t.join()
text = ''
while True:
try:
line = q.get_nowait()
text += line
print(line)
except Empty:
break
socketio.emit('client response', {'text': text})
socketio.run(app)
import shlex
shellCommand = shlex.split('find / test')
popen = subprocess.Popen(shellCommand, stdout=subprocess.PIPE)
lines_iterator = iter(popen.stdout.readline, b"")
for line in lines_iterator:
print(line)
eventlet.sleep()
find
重载是为了让您的示例更容易重现。但是,在我的代码中,我实际上使用 tesseract "{0}" stdout -l deu
作为卖出命令。这(与 find
不同)仍然会阻止所有内容。这是一个 tesseract
问题而不是 eventlet?但是仍然:如果它发生在一个单独的线程中,当 find
不阻塞时,它会随着上下文切换逐行读取,怎么会阻塞?
感谢这个问题,我今天学到了一些新东西。 Eventlet 确实提供了 subprocess 及其功能的 greenlet 友好版本,但由于某些奇怪的原因,它没有在标准库中修补此模块。
Link 到子流程的 eventlet 实现:https://github.com/eventlet/eventlet/blob/master/eventlet/green/subprocess.py
查看eventletpatcher,打补丁的模块有os、select、socket、thread、time、MySQLdb、builtins和psycopg2。补丁程序中绝对没有对子进程的引用。
好消息是我能够在与您的应用程序非常相似的应用程序中使用 Popen()
,在我替换之后:
import subprocess
与:
from eventlet.green import subprocess
但注意目前发布的eventlet版本(0.17.4)不支持Popen
中的universal_newlines
选项,使用会报错。 master 中支持此选项(这里是添加该选项的 commit)。您要么必须从您的呼叫中删除该选项,要么直接从 github.