在 Tornado 应用程序中将阻塞功能卸载到执行程序的通用解决方案
General solution to offloading blocking function to executor in Tornado app
我正在尝试找到一个通用的解决方案来将阻塞任务卸载到 ThreadPoolExecutor
。
在下面的示例中,我可以使用 Tornado 的 run_on_executor
装饰器在 NonBlockingHandler
中实现所需的非阻塞结果。
在 asyncify
装饰器中,我试图完成同样的事情,但它阻止了其他调用。
关于如何让 asyncify
装饰器正常工作而不导致装饰函数阻塞的任何想法?
注意: 我正在使用 Python 3.6.8 和 Tornado 4.5.3
这是完整的工作示例:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import functools
import time
from tornado.concurrent import Future, run_on_executor
import tornado.ioloop
import tornado.web
def asyncify(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
result = Future()
def _run_on_executor(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
with ThreadPoolExecutor(max_workers=1) as executor:
return executor.submit(fn, *args, **kwargs)
return wrapper
@_run_on_executor
def _func(*args, **kwargs):
return func(*args, **kwargs)
def on_response(response):
result.set_result(response.result())
future = _func(*args, **kwargs)
future.add_done_callback(on_response)
return await result
return wrapper
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write(f"Hello, {self.__class__.__name__}")
class AsyncifyHandler(tornado.web.RequestHandler):
@asyncify
def get(self):
print("sleeping")
time.sleep(5)
self.write(f"Hello, {self.__class__.__name__}")
class NonBlockingHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(max_workers=1)
@run_on_executor
def blocking(self):
print("sleeping")
time.sleep(5)
self.write(f"Hello, {self.__class__.__name__}")
async def get(self):
result = Future()
publish = self.blocking()
publish.add_done_callback(
lambda response: result.set_result(response.result())
)
return await result
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
(r"/asyncify", AsyncifyHandler),
(r"/noblock", NonBlockingHandler),
], debug=True)
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
退出 with ThreadPoolExecutor
块等待(同步!)执行程序上的所有任务完成。当 IOLoop 运行ning 时,您不能关闭执行程序;只需创建一个全局 运行 并让它永远 运行 即可。
我正在尝试找到一个通用的解决方案来将阻塞任务卸载到 ThreadPoolExecutor
。
在下面的示例中,我可以使用 Tornado 的 run_on_executor
装饰器在 NonBlockingHandler
中实现所需的非阻塞结果。
在 asyncify
装饰器中,我试图完成同样的事情,但它阻止了其他调用。
关于如何让 asyncify
装饰器正常工作而不导致装饰函数阻塞的任何想法?
注意: 我正在使用 Python 3.6.8 和 Tornado 4.5.3
这是完整的工作示例:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import functools
import time
from tornado.concurrent import Future, run_on_executor
import tornado.ioloop
import tornado.web
def asyncify(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
result = Future()
def _run_on_executor(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
with ThreadPoolExecutor(max_workers=1) as executor:
return executor.submit(fn, *args, **kwargs)
return wrapper
@_run_on_executor
def _func(*args, **kwargs):
return func(*args, **kwargs)
def on_response(response):
result.set_result(response.result())
future = _func(*args, **kwargs)
future.add_done_callback(on_response)
return await result
return wrapper
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write(f"Hello, {self.__class__.__name__}")
class AsyncifyHandler(tornado.web.RequestHandler):
@asyncify
def get(self):
print("sleeping")
time.sleep(5)
self.write(f"Hello, {self.__class__.__name__}")
class NonBlockingHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(max_workers=1)
@run_on_executor
def blocking(self):
print("sleeping")
time.sleep(5)
self.write(f"Hello, {self.__class__.__name__}")
async def get(self):
result = Future()
publish = self.blocking()
publish.add_done_callback(
lambda response: result.set_result(response.result())
)
return await result
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
(r"/asyncify", AsyncifyHandler),
(r"/noblock", NonBlockingHandler),
], debug=True)
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
退出 with ThreadPoolExecutor
块等待(同步!)执行程序上的所有任务完成。当 IOLoop 运行ning 时,您不能关闭执行程序;只需创建一个全局 运行 并让它永远 运行 即可。