在后台线程上执行长时间 运行 计算密集型操作
Performing long running compute intensive operation on background thread
我在 Python Tornado 框架中编写了一个 REST API,它可以预测给定段落中问题的答案。
这是 Tornado 处理程序的 Python 代码:
def post(self):
"""
This function predicts the response from the pre-trained Allen model
"""
try:
request_payload = tornado.escape.json_decode(self.request.body)
if (request_payload is None):
return self._return_response(self, { "message": "Invalid request!" }, 400)
context = request_payload["context"]
question = request_payload["question"]
if(context is None or not context):
return self._return_response(self, { "message": "Context is not provided!" }, 400)
if(question is None or not question):
return self._return_response(self, { "message": "Question is not provided!" }, 400)
# Compute intensive operation which blocks the main thread
answer_prediction = predictor.predict(passage=str(context), question=str(question))
best_answer = answer_prediction["best_span_str"] or "Sorry, no answer found for your question!"
return self._return_response(self, { "answer": best_answer }, 200)
except KeyError:
#Return bad request if any of the keys are missing
return self._return_response(self, { "message": 'Some keys are missing from the request!' }, 400)
except json.decoder.JSONDecodeError:
return self._return_response(self, { "message": 'Cannot decode request body!' }, 400)
except Exception as ex:
return self._return_response(self, { "message": 'Could not complete the request because of some error at the server!', "cause": ex.args[0], "stack_trace": traceback.format_exc(sys.exc_info()) }, 500)
问题是那一行:
answer_prediction = predictor.predict(passage=str(context),
question=str(question))
阻塞传入请求的主线程并等待直到那个长 运行ning 操作完成,同时阻塞其他请求并有时使当前请求超时。
我已阅读 this 回答,其中详细介绍了将长 运行ning 操作放入队列的解决方案,但我不明白。
另外由于Python的GIL只有一个线程可以同时运行,这迫使我产生一个单独的进程来处理它,因为进程是昂贵的,有没有我的问题的任何可行解决方案以及如何处理这种情况。
这是我的问题:
- 如何安全地将计算密集型操作卸载到后台
线程
- 如何优雅地处理超时和异常
- 如何维护一个队列结构来检查长运行ning操作是否已经完成。
我认为您应该将此 API 调用转为异步调用,并立即将 return 转给带有令牌的调用方。
令牌稍后将使用该令牌来检查(使用另一个 API 调用)操作是否完成。
运行 单独线程中的阻塞代码。使用 IOLoop.run_in_executor
.
示例:
from functools import partial
async def post(self):
...
# create a partial object with the keyword arguments
predict_partial = partial(predictor.predict, passage=str(context), question=str(question))
answer_prediction = await IOLoop.current().run_in_executor(None, predict_partial)
...
我在 Python Tornado 框架中编写了一个 REST API,它可以预测给定段落中问题的答案。
这是 Tornado 处理程序的 Python 代码:
def post(self):
"""
This function predicts the response from the pre-trained Allen model
"""
try:
request_payload = tornado.escape.json_decode(self.request.body)
if (request_payload is None):
return self._return_response(self, { "message": "Invalid request!" }, 400)
context = request_payload["context"]
question = request_payload["question"]
if(context is None or not context):
return self._return_response(self, { "message": "Context is not provided!" }, 400)
if(question is None or not question):
return self._return_response(self, { "message": "Question is not provided!" }, 400)
# Compute intensive operation which blocks the main thread
answer_prediction = predictor.predict(passage=str(context), question=str(question))
best_answer = answer_prediction["best_span_str"] or "Sorry, no answer found for your question!"
return self._return_response(self, { "answer": best_answer }, 200)
except KeyError:
#Return bad request if any of the keys are missing
return self._return_response(self, { "message": 'Some keys are missing from the request!' }, 400)
except json.decoder.JSONDecodeError:
return self._return_response(self, { "message": 'Cannot decode request body!' }, 400)
except Exception as ex:
return self._return_response(self, { "message": 'Could not complete the request because of some error at the server!', "cause": ex.args[0], "stack_trace": traceback.format_exc(sys.exc_info()) }, 500)
问题是那一行:
answer_prediction = predictor.predict(passage=str(context), question=str(question))
阻塞传入请求的主线程并等待直到那个长 运行ning 操作完成,同时阻塞其他请求并有时使当前请求超时。
我已阅读 this 回答,其中详细介绍了将长 运行ning 操作放入队列的解决方案,但我不明白。
另外由于Python的GIL只有一个线程可以同时运行,这迫使我产生一个单独的进程来处理它,因为进程是昂贵的,有没有我的问题的任何可行解决方案以及如何处理这种情况。
这是我的问题:
- 如何安全地将计算密集型操作卸载到后台 线程
- 如何优雅地处理超时和异常
- 如何维护一个队列结构来检查长运行ning操作是否已经完成。
我认为您应该将此 API 调用转为异步调用,并立即将 return 转给带有令牌的调用方。
令牌稍后将使用该令牌来检查(使用另一个 API 调用)操作是否完成。
运行 单独线程中的阻塞代码。使用 IOLoop.run_in_executor
.
示例:
from functools import partial
async def post(self):
...
# create a partial object with the keyword arguments
predict_partial = partial(predictor.predict, passage=str(context), question=str(question))
answer_prediction = await IOLoop.current().run_in_executor(None, predict_partial)
...