在未来包装一个队列

Wrapping a Queue in Future

我正在 Python 3.7 中编写一个 Tornado 网络服务器来显示 multiprocessing 库的进程 运行 的状态。

以下代码有效,但我希望能够使用 Tornado 的内置库来完成它,而不是在线程库中进行黑客攻击。在 queue.get 期间,我还没有想出如何在不阻止龙卷风的情况下做到这一点。我认为正确的解决方案是在某种未来包装 get 调用。我已经尝试了几个小时,但还没想出如何做到这一点。

我的多处理脚本内部:

class ProcessToMonitor(multiprocessing.Process)

def __init__(self):
    multiprocessing.Process.__init__(self)
    self.queue = multiprocessing.Queue()

def run():
    while True:
        # do stuff
        self.queue.put(value)

然后,在我的 Tornado 脚本中

class MyWebSocket(tornado.websocket.WebSocketHandler):
    connections = set()

    def open(self):
        self.connections.add(self)

    def close(self):
        self.connections.remove(self)

    @classmethod
    def emit(self, message):
        [client.write_message(message) for client in self.connections]

def worker():
    ptm = ProcessToMonitor()
    ptm.start()
    while True:
        message = ptm.queue.get()
        MyWebSocket.emit(message)

if __name__ == '__main__':
    app = tornado.web.Application([
        (r'/', MainHandler), # Not shown
        (r'/websocket', MyWebSocket)
    ])
    app.listen(8888)

    threading.Thread(target=worker)

    ioloop = tornado.ioloop.IOLoop.current()
    ioloop.start()

queue.get 不是阻塞函数,它只是等待直到队列中有一个项目,以防队列为空。我可以从您的代码中看到 queue.get 非常适合您在 while 循环中的用例。

我认为您可能使用不当。您必须将 worker 函数设为协程(async/await 语法):

async def worker():
    ...
    while True:
        message = await queue.get()
        ...

但是,如果您不想等待某个项目并希望立即继续,其备选方案是 queue.get_nowait

这里要注意的一件事是,如果队列为空,queue.get_nowait 将引发一个名为 QueueEmpty 的异常。因此,您需要处理该异常。

示例:

while True:
    try:
        message = queue.get_nowait()
    except QueueEmpty:
        # wait for some time before
        # next iteration
        # otherwise this loop will
        # keep running for no reason

    MyWebSocket.emit(message)

如您所见,如果队列为空,则必须暂停 while 循环一段时间,以防止系统不堪重负。

那么为什么不首先使用 queue.get