Python Tornado - 如何实现长轮询服务器以从队列中读取

Python Tornado - How to Implement Long-Polling Server to Read from a Queue

我正在尝试构建一个 Web 服务器以通过 AJAX 收集 "commands",然后通过长轮询将命令分发给客户端。

目标是有人将一些数据发布到 /add-command。

另一个客户端实现了一个长轮询客户端点击 /poll 等待命令执行。

我认为队列是用于保存等待关注的命令的正确数据结构。我希望这些命令基本上立即分发给任何长轮询客户端,但如果当前没有客户端正在轮询则保留。

这是我的 python 脚本。

import os
import time
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import Queue
import multiprocessing.pool
import mysql.connector
import urlparse
import uuid
import json

_commandQueue = Queue.Queue()
_commandPollInterval = 0.2
_commandPollTimeout = 10

class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("home.htm")

class AddCommandHandler(tornado.web.RequestHandler):
    def post(self):
        d = urlparse.parse_qs(self.request.body)
        _commandQueue.put(d)
        self.write(str(True))

class PollHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        self.write("start")
        d = 1
        d = yield self.getCommand()
        self.write(str(d))
        self.write("end")
        self.finish()
    @tornado.gen.coroutine
    def getCommand(self):
        start = time.time()
        while (time.time() - start) < _commandPollTimeout * 1000:
            if not _commandQueue.empty:
                return _commandQueue.get()
            else:
                time.sleep(_commandPollInterval)
        return None 

def main():
    application = tornado.web.Application(
        [
            (r"/", HomeHandler),
            (r"/add-command", AddCommandHandler),
            (r"/poll", PollHandler),
        ], 
        debug=True, 
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
    )
    tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000)))
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()

AddCommandHandler 可以很好地将项目放入 _commandQueue

PollHandler 请求刚刚超时。如果我调用 PollHandler,它似乎锁定了 _commandQueue,我无法放入或取出它。

我怀疑我需要加入队列,但我似乎无法在代码中找到合适的时间。

更新 -- 这是我的最终代码,感谢答案

import os
import time
import datetime
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import tornado.queues
import urlparse
import json

_commandQueue = tornado.queues.Queue()
_commandPollInterval = 0.2
_commandPollTimeout = 10

class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("home.htm")

class AddCommandHandler(tornado.web.RequestHandler):
    def get(self):
        cmd = urlparse.parse_qs(self.request.body)
        _commandQueue.put(cmd)
        self.write(str(cmd))
    def post(self):
        cmd = urlparse.parse_qs(self.request.body)
        _commandQueue.put(cmd)
        self.write(str(cmd))

class PollHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        cmd = yield self.getCommand()
        self.write(str(cmd))
    @tornado.gen.coroutine
    def getCommand(self):
        try:
            cmd = yield _commandQueue.get(
                timeout=datetime.timedelta(seconds=_commandPollTimeout)
            )
            raise tornado.gen.Return(cmd)
        except tornado.gen.TimeoutError:
            raise tornado.gen.Return()

def main():
    application = tornado.web.Application(
        [
            (r"/", HomeHandler),
            (r"/add-command", AddCommandHandler),
            (r"/poll", PollHandler),
        ], 
        debug=True, 
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
    )
    tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000)))
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()

不能在侦听器中使用睡眠,因为它会阻止从输入流读取。 time.sleep(_commandPollInterval)。你应该使用的是yield gen.sleep(_commandPollInterval)

在异步模型中你应该省略阻塞操作,time.sleep 在你的代码中是邪恶的。此外,我认为最好的方法是使用龙卷风的(在异步接口中)队列 - tornado.queue.Queue 并使用异步获取:

import datetime
import tornado.gen
import tornado.queues

_commandQueue = tornado.queues.Queue()


    # ...rest of the code ...

    @tornado.gen.coroutine
    def getCommand(self):
        try:
            # wait for queue item if cannot obtain in timeout raise exception
            cmd = yield _commandQueue.get(
                timeout=datetime.timedelta(seconds=_commandPollTimeout)
            )
            return cmd
        except tornado.gen.Timeout:
            return None

注意:模块 tornado.queues 自 Tornado 4.x 以来可用,如果您使用旧版本,Toro 会有所帮助。