使用 Tornado web 框架的异步任务执行
Asynchronous task execution using Tornado web framework
我熟悉事件驱动编程,但我遇到了这个问题,我已经终止了可能的解决方案。我阅读了 Tornado 的文档,我尝试使用:
- 期货
- gen.coroutine
- 异步
- add_timeout
但我无法解决以下问题:
我有一个只监听新消息的 websocket 服务器
并根据消息类型调用特定函数
class WebSocketHandler(tornado.websocket.WebSocketHandler):
...
def on_message(self, message):
if message['type'] is X:
self.write(functionA(message['data']))
elif message['type'] is Y:
self.write(functionB(message['data']))
...
当执行一个计算量大的函数时,问题就来了,比如说函数 A,它最多可能需要 5 分钟才能终止
def functionA(message):
params = extract_params(message)
cmd = "computationally_expensive_tool"
out = check_output(cmd, shell=True, stderr=STDOUT, cwd=working_dir)
...
return json.dumps({
"error": False,
"message": "computationally_expensive_tool_execution_terminated",
"type": X
})
我的问题是如何以异步方式执行该函数,以便在它准备就绪时我仍然可以处理其他消息和 functionA 的结果?
如果 functionA
是一个不能异步的阻塞函数,您可能希望 运行 它在线程池中:
executor = concurrent.futures.ThreadPoolExecutor()
@gen.coroutine
def on_message(self, message):
if message['type'] is X:
yield executor.submit(functionA, message['data'])
elif message['type'] is Y:
functionB(message['data'])
这将阻止此 websocket 直到 functionA
完成,但允许其他连接继续工作。如果您需要在 functionA
运行 时继续处理来自 相同 连接的其他类型的消息,则需要更复杂的安排,可能涉及 tornado.queues.Queue
.
我熟悉事件驱动编程,但我遇到了这个问题,我已经终止了可能的解决方案。我阅读了 Tornado 的文档,我尝试使用:
- 期货
- gen.coroutine
- 异步
- add_timeout
但我无法解决以下问题:
我有一个只监听新消息的 websocket 服务器 并根据消息类型调用特定函数
class WebSocketHandler(tornado.websocket.WebSocketHandler):
... def on_message(self, message): if message['type'] is X: self.write(functionA(message['data'])) elif message['type'] is Y: self.write(functionB(message['data'])) ...
当执行一个计算量大的函数时,问题就来了,比如说函数 A,它最多可能需要 5 分钟才能终止
def functionA(message):
params = extract_params(message)
cmd = "computationally_expensive_tool"
out = check_output(cmd, shell=True, stderr=STDOUT, cwd=working_dir)
...
return json.dumps({
"error": False,
"message": "computationally_expensive_tool_execution_terminated",
"type": X
})
我的问题是如何以异步方式执行该函数,以便在它准备就绪时我仍然可以处理其他消息和 functionA 的结果?
如果 functionA
是一个不能异步的阻塞函数,您可能希望 运行 它在线程池中:
executor = concurrent.futures.ThreadPoolExecutor()
@gen.coroutine
def on_message(self, message):
if message['type'] is X:
yield executor.submit(functionA, message['data'])
elif message['type'] is Y:
functionB(message['data'])
这将阻止此 websocket 直到 functionA
完成,但允许其他连接继续工作。如果您需要在 functionA
运行 时继续处理来自 相同 连接的其他类型的消息,则需要更复杂的安排,可能涉及 tornado.queues.Queue
.