在未来包装一个队列
Wrapping a Queue in Future
我正在 Python 3.7 中编写一个 Tornado 网络服务器来显示 multiprocessing
库的进程 运行 的状态。
以下代码有效,但我希望能够使用 Tornado 的内置库来完成它,而不是在线程库中进行黑客攻击。在 queue.get
期间,我还没有想出如何在不阻止龙卷风的情况下做到这一点。我认为正确的解决方案是在某种未来包装 get
调用。我已经尝试了几个小时,但还没想出如何做到这一点。
我的多处理脚本内部:
class ProcessToMonitor(multiprocessing.Process)
def __init__(self):
multiprocessing.Process.__init__(self)
self.queue = multiprocessing.Queue()
def run():
while True:
# do stuff
self.queue.put(value)
然后,在我的 Tornado 脚本中
class MyWebSocket(tornado.websocket.WebSocketHandler):
connections = set()
def open(self):
self.connections.add(self)
def close(self):
self.connections.remove(self)
@classmethod
def emit(self, message):
[client.write_message(message) for client in self.connections]
def worker():
ptm = ProcessToMonitor()
ptm.start()
while True:
message = ptm.queue.get()
MyWebSocket.emit(message)
if __name__ == '__main__':
app = tornado.web.Application([
(r'/', MainHandler), # Not shown
(r'/websocket', MyWebSocket)
])
app.listen(8888)
threading.Thread(target=worker)
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
queue.get
不是阻塞函数,它只是等待直到队列中有一个项目,以防队列为空。我可以从您的代码中看到 queue.get
非常适合您在 while 循环中的用例。
我认为您可能使用不当。您必须将 worker
函数设为协程(async
/await
语法):
async def worker():
...
while True:
message = await queue.get()
...
但是,如果您不想等待某个项目并希望立即继续,其备选方案是 queue.get_nowait
。
这里要注意的一件事是,如果队列为空,queue.get_nowait
将引发一个名为 QueueEmpty
的异常。因此,您需要处理该异常。
示例:
while True:
try:
message = queue.get_nowait()
except QueueEmpty:
# wait for some time before
# next iteration
# otherwise this loop will
# keep running for no reason
MyWebSocket.emit(message)
如您所见,如果队列为空,则必须暂停 while 循环一段时间,以防止系统不堪重负。
那么为什么不首先使用 queue.get
?
我正在 Python 3.7 中编写一个 Tornado 网络服务器来显示 multiprocessing
库的进程 运行 的状态。
以下代码有效,但我希望能够使用 Tornado 的内置库来完成它,而不是在线程库中进行黑客攻击。在 queue.get
期间,我还没有想出如何在不阻止龙卷风的情况下做到这一点。我认为正确的解决方案是在某种未来包装 get
调用。我已经尝试了几个小时,但还没想出如何做到这一点。
我的多处理脚本内部:
class ProcessToMonitor(multiprocessing.Process)
def __init__(self):
multiprocessing.Process.__init__(self)
self.queue = multiprocessing.Queue()
def run():
while True:
# do stuff
self.queue.put(value)
然后,在我的 Tornado 脚本中
class MyWebSocket(tornado.websocket.WebSocketHandler):
connections = set()
def open(self):
self.connections.add(self)
def close(self):
self.connections.remove(self)
@classmethod
def emit(self, message):
[client.write_message(message) for client in self.connections]
def worker():
ptm = ProcessToMonitor()
ptm.start()
while True:
message = ptm.queue.get()
MyWebSocket.emit(message)
if __name__ == '__main__':
app = tornado.web.Application([
(r'/', MainHandler), # Not shown
(r'/websocket', MyWebSocket)
])
app.listen(8888)
threading.Thread(target=worker)
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
queue.get
不是阻塞函数,它只是等待直到队列中有一个项目,以防队列为空。我可以从您的代码中看到 queue.get
非常适合您在 while 循环中的用例。
我认为您可能使用不当。您必须将 worker
函数设为协程(async
/await
语法):
async def worker():
...
while True:
message = await queue.get()
...
但是,如果您不想等待某个项目并希望立即继续,其备选方案是 queue.get_nowait
。
这里要注意的一件事是,如果队列为空,queue.get_nowait
将引发一个名为 QueueEmpty
的异常。因此,您需要处理该异常。
示例:
while True:
try:
message = queue.get_nowait()
except QueueEmpty:
# wait for some time before
# next iteration
# otherwise this loop will
# keep running for no reason
MyWebSocket.emit(message)
如您所见,如果队列为空,则必须暂停 while 循环一段时间,以防止系统不堪重负。
那么为什么不首先使用 queue.get
?