使用 gevent.queue.Queue.get():gevent.hub.LoopExit:'This operation would block forever'
Using gevent.queue.Queue.get(): gevent.hub.LoopExit: 'This operation would block forever'
过去几天,我一直在尝试将事件流集成到我的 Flask 应用程序中,在我的本地测试中取得了不错的结果,但当 运行 应用程序在我的服务器上使用 uWSGI 时,情况就更糟了。我的代码基本上是建立在 Flask 的 example 之上的。我正在使用 python 3.4.2
.
问题
当 运行 我的 uWSGI 服务器上的应用程序时,只要客户端尝试连接到 /streaming
端点,它就会引发 gevent.hub.LoopExit: 'This operation would block forever'.
。我的假设是,这是由于无限期地在空队列上调用 get()
造成的。
完整追溯:
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/werkzeug/wsgi.py", line 691, in __next__
return self._next()
File "/usr/lib/python3/dist-packages/werkzeug/wrappers.py", line 81, in _iter_encoded
for item in iterable:
File "./voting/__init__.py", line 49, in gen
result = queue.get(block=True)
File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 284, in get
return self.__get_or_peek(self._get, block, timeout)
File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 261, in __get_or_peek
result = waiter.get()
File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 878, in get
return self.hub.switch()
File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 609, in switch
return greenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x7f717f40f5a0 epoll default pending=0 ref=0 fileno=6>)
我的代码
/streaming
端点:
@app.route("/streaming", methods=["GET", "OPTIONS"])
def streaming():
def gen():
queue = Queue()
subscriptions.add_subscription(session_id, queue)
try:
while True:
result = queue.get() # Where the Exception is raised
ev = ServerSentEvent(json.dumps(result["data"]), result["type"])
yield ev.encode()
except GeneratorExit: # TODO Need a better method to detect disconnecting
subscriptions.remove_subscription(session_id, queue)
return Response(gen(), mimetype="text/event-stream")
正在向队列中添加一个事件:
def notify():
msg = {"type": "users", "data": db_get_all_registered(session_id)}
subscriptions.add_item(session_id, msg) # Adds the item to the relevant queues.
gevent.spawn(notify)
如前所述,它在本地运行良好 werkzeug
:
from app import app
from gevent.wsgi import WSGIServer
from werkzeug.debug import DebuggedApplication
a = DebuggedApplication(app, evalex=True)
server = WSGIServer(("", 5000), a)
server.serve_forever()
我试过的
猴子补丁 monkey.patch_all()
.
正在从 Queue
切换到 JoinableQueue
。
gevent.sleep(0)
结合 Queue.get()
.
该异常基本上意味着 loop/thread 中没有其他 greenlets 运行 可以切换到。所以当 greenlet 去阻塞(queue.get())时,hub 无处可去,无事可做。
相同的代码可以在 gevent 的 WSGIServer 中运行,因为服务器本身是一个 greenlet,它是 运行 socket.accept 循环,所以总是有另一个 greenlet 可以切换到。但显然 uwsgi 不是那样工作的。
解决这个问题的方法是安排其他绿叶 运行。例如,不是生成一个 greenlet 来按需通知,而是安排这样一个 greenlet 已经 运行 并阻塞在自己的队列中。
过去几天,我一直在尝试将事件流集成到我的 Flask 应用程序中,在我的本地测试中取得了不错的结果,但当 运行 应用程序在我的服务器上使用 uWSGI 时,情况就更糟了。我的代码基本上是建立在 Flask 的 example 之上的。我正在使用 python 3.4.2
.
问题
当 运行 我的 uWSGI 服务器上的应用程序时,只要客户端尝试连接到 /streaming
端点,它就会引发 gevent.hub.LoopExit: 'This operation would block forever'.
。我的假设是,这是由于无限期地在空队列上调用 get()
造成的。
完整追溯:
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/werkzeug/wsgi.py", line 691, in __next__
return self._next()
File "/usr/lib/python3/dist-packages/werkzeug/wrappers.py", line 81, in _iter_encoded
for item in iterable:
File "./voting/__init__.py", line 49, in gen
result = queue.get(block=True)
File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 284, in get
return self.__get_or_peek(self._get, block, timeout)
File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 261, in __get_or_peek
result = waiter.get()
File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 878, in get
return self.hub.switch()
File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 609, in switch
return greenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x7f717f40f5a0 epoll default pending=0 ref=0 fileno=6>)
我的代码
/streaming
端点:
@app.route("/streaming", methods=["GET", "OPTIONS"])
def streaming():
def gen():
queue = Queue()
subscriptions.add_subscription(session_id, queue)
try:
while True:
result = queue.get() # Where the Exception is raised
ev = ServerSentEvent(json.dumps(result["data"]), result["type"])
yield ev.encode()
except GeneratorExit: # TODO Need a better method to detect disconnecting
subscriptions.remove_subscription(session_id, queue)
return Response(gen(), mimetype="text/event-stream")
正在向队列中添加一个事件:
def notify():
msg = {"type": "users", "data": db_get_all_registered(session_id)}
subscriptions.add_item(session_id, msg) # Adds the item to the relevant queues.
gevent.spawn(notify)
如前所述,它在本地运行良好 werkzeug
:
from app import app
from gevent.wsgi import WSGIServer
from werkzeug.debug import DebuggedApplication
a = DebuggedApplication(app, evalex=True)
server = WSGIServer(("", 5000), a)
server.serve_forever()
我试过的
猴子补丁
monkey.patch_all()
.正在从
Queue
切换到JoinableQueue
。gevent.sleep(0)
结合Queue.get()
.
该异常基本上意味着 loop/thread 中没有其他 greenlets 运行 可以切换到。所以当 greenlet 去阻塞(queue.get())时,hub 无处可去,无事可做。
相同的代码可以在 gevent 的 WSGIServer 中运行,因为服务器本身是一个 greenlet,它是 运行 socket.accept 循环,所以总是有另一个 greenlet 可以切换到。但显然 uwsgi 不是那样工作的。
解决这个问题的方法是安排其他绿叶 运行。例如,不是生成一个 greenlet 来按需通知,而是安排这样一个 greenlet 已经 运行 并阻塞在自己的队列中。