在 Tornado 应用程序中将阻塞功能卸载到执行程序的通用解决方案

General solution to offloading blocking function to executor in Tornado app

我正在尝试找到一个通用的解决方案来将阻塞任务卸载到 ThreadPoolExecutor

在下面的示例中,我可以使用 Tornado 的 run_on_executor 装饰器在 NonBlockingHandler 中实现所需的非阻塞结果。

asyncify 装饰器中,我试图完成同样的事情,但它阻止了其他调用。

关于如何让 asyncify 装饰器正常工作而不导致装饰函数阻塞的任何想法?

注意: 我正在使用 Python 3.6.8 和 Tornado 4.5.3

这是完整的工作示例:

import asyncio
from concurrent.futures import ThreadPoolExecutor
import functools
import time

from tornado.concurrent import Future,  run_on_executor
import tornado.ioloop
import tornado.web


def asyncify(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        result = Future()

        def _run_on_executor(fn):
            @functools.wraps(fn)
            def wrapper(*args, **kwargs):
                with ThreadPoolExecutor(max_workers=1) as executor:
                    return executor.submit(fn, *args, **kwargs)
            return wrapper

        @_run_on_executor
        def _func(*args, **kwargs):
            return func(*args, **kwargs)

        def on_response(response):
           result.set_result(response.result())

        future = _func(*args, **kwargs)
        future.add_done_callback(on_response)

        return await result
    return wrapper


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write(f"Hello, {self.__class__.__name__}")


class AsyncifyHandler(tornado.web.RequestHandler):
    @asyncify
    def get(self):
        print("sleeping")
        time.sleep(5)
        self.write(f"Hello, {self.__class__.__name__}")


class NonBlockingHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(max_workers=1)

    @run_on_executor
    def blocking(self):
        print("sleeping")
        time.sleep(5)
        self.write(f"Hello, {self.__class__.__name__}")

    async def get(self):
        result = Future()
        publish = self.blocking()
        publish.add_done_callback(
            lambda response: result.set_result(response.result())
        )
        return await result


def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
        (r"/asyncify", AsyncifyHandler),
        (r"/noblock", NonBlockingHandler),
    ], debug=True)


if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

退出 with ThreadPoolExecutor 块等待(同步!)执行程序上的所有任务完成。当 IOLoop 运行ning 时,您不能关闭执行程序;只需创建一个全局 运行 并让它永远 运行 即可。