使用 Tornado web 框架的异步任务执行

Asynchronous task execution using Tornado web framework

我熟悉事件驱动编程,但我遇到了这个问题,我已经终止了可能的解决方案。我阅读了 Tornado 的文档,我尝试使用:

  1. 期货
  2. gen.coroutine
  3. 异步
  4. add_timeout

但我无法解决以下问题:

当执行一个计算量大的函数时,问题就来了,比如说函数 A,它最多可能需要 5 分钟才能终止

def functionA(message):
    params = extract_params(message)
    cmd = "computationally_expensive_tool"
    out = check_output(cmd, shell=True, stderr=STDOUT, cwd=working_dir)
    ...
    return json.dumps({
                        "error": False,
                        "message": "computationally_expensive_tool_execution_terminated",
                        "type": X
                    })

我的问题是如何以异步方式执行该函数,以便在它准备就绪时我仍然可以处理其他消息和 functionA 的结果?

如果 functionA 是一个不能异步的阻塞函数,您可能希望 运行 它在线程池中:

executor = concurrent.futures.ThreadPoolExecutor()

@gen.coroutine
def on_message(self, message):
    if message['type'] is X:
        yield executor.submit(functionA, message['data'])
    elif message['type'] is Y:
        functionB(message['data'])

这将阻止此 websocket 直到 functionA 完成,但允许其他连接继续工作。如果您需要在 functionA 运行 时继续处理来自 相同 连接的其他类型的消息,则需要更复杂的安排,可能涉及 tornado.queues.Queue.