在后台线程上执行长时间 运行 计算密集型操作

Performing long running compute intensive operation on background thread

我在 Python Tornado 框架中编写了一个 REST API,它可以预测给定段落中问题的答案。


这是 Tornado 处理程序的 Python 代码:

def post(self):
    """
    This function predicts the response from the pre-trained Allen model
    """    
    try:
        request_payload = tornado.escape.json_decode(self.request.body)

        if (request_payload is None):
            return self._return_response(self, { "message": "Invalid request!" }, 400)

        context = request_payload["context"]
        question = request_payload["question"]

        if(context is None or not context):
            return self._return_response(self, { "message": "Context is not provided!" }, 400)

        if(question is None or not question):
            return self._return_response(self, { "message": "Question is not provided!" }, 400)

        # Compute intensive operation which blocks the main thread
        answer_prediction = predictor.predict(passage=str(context), question=str(question))
        best_answer = answer_prediction["best_span_str"] or "Sorry, no answer found for your question!"

        return self._return_response(self, { "answer": best_answer }, 200)

    except KeyError:
        #Return bad request if any of the keys are missing
        return self._return_response(self, { "message": 'Some keys are missing from the request!' }, 400)

    except json.decoder.JSONDecodeError:
        return self._return_response(self, { "message": 'Cannot decode request body!' }, 400)

    except Exception as ex:
        return self._return_response(self, { "message": 'Could not complete the request because of some error at the server!', "cause": ex.args[0], "stack_trace": traceback.format_exc(sys.exc_info()) }, 500)

问题是那一行:

answer_prediction = predictor.predict(passage=str(context), question=str(question))

阻塞传入请求的主线程并等待直到那个长 运行ning 操作完成,同时阻塞其他请求并有时使当前请求超时。


我已阅读 this 回答,其中详细介绍了将长 运行ning 操作放入队列的解决方案,但我不明白。

另外由于Python的GIL只有一个线程可以同时运行,这迫使我产生一个单独的进程来处理它,因为进程是昂贵的,有没有我的问题的任何可行解决方案以及如何处理这种情况。

这是我的问题:

我认为您应该将此 API 调用转为异步调用,并立即将 return 转给带有令牌的调用方。

令牌稍后将使用该令牌来检查(使用另一个 API 调用)操作是否完成。

运行 单独线程中的阻塞代码。使用 IOLoop.run_in_executor.

示例:

from functools import partial

async def post(self):
    ...

    # create a partial object with the keyword arguments
    predict_partial = partial(predictor.predict, passage=str(context), question=str(question))

    answer_prediction = await IOLoop.current().run_in_executor(None, predict_partial)

    ...