异步进程池执行程序不在龙卷风中工作
Async process pool executor not working in tornado
我觉得在这一点上我离良好的实践还很远。我在使用龙卷风框架的同时还利用了 ProcessPoolExecutor
import tornado.ioloop
import datetime
import tornado.web
import azure.functions as func
import json
import logging
import requests
import sys
import os
import pymongo
import mongo_config
import re
import concurrent.futures
from azure_model import Pytorch_Azure
MONGO_URL = mongo_config.uri()
mongo_client = pymongo.MongoClient(MONGO_URL)
db = mongo_client['db']
prods = mongo_client['db']['products']
pta = Pytorch_Azure()
def parallel_pred(img):
r = requests.get(img, timeout = 10)
img_id = img.split('/')[-1].split('.')[0]
img_name = 'tmp{}.png'.format(img_id)
with open(img_name, 'wb') as f:
f.write(r.content)
prediction = pta.predict(img_name)
os.remove(img_name)
return prediction
class Predictionator(tornado.web.RequestHandler):
def data_received(self, chunk):
pass
def get(self):
merchant_id = self.get_argument('id', None, True)
prod_type = self.request.uri.split('category=')[1].split('&id=')[0].replace('%20', ' ').replace('%26', '&').replace('%27', '\'')
pred_list = []
outputs = {}
print(type(prod_type))
if merchant_id and prod_type:
counter = 0
try:
print(prod_type)
for i in prods.find({'merchant': int(merchant_id), 'details.product_type':re.compile('^' + prod_type + '$', re.IGNORECASE)}):
prod_img = i['merchantImages'][0]
if prod_img not in pred_list:
pred_list.append(prod_img)
counter += 1
if counter == 5:
break
except:
self.write({'body': 'There was an error with the query. Please ensure you are using a correct merchant id and product type'})
print(pred_list)
if pred_list:
try:
executor = concurrent.futures.ProcessPoolExecutor(4)
for pred_out in executor.map(parallel_pred, pred_list, timeout = 15):
if pred_out['label'] not in outputs.keys():
outputs[pred_out['label']] = 1
else:
outputs[pred_out['label']] += 1
except:
self.write({'body': 'There was an issue making the predictions.'})
if outputs:
prediction = {}
prediction['label'] = max(outputs, key = outputs.get)
prediction['object_id'] = db.categories.find_one({'name':prediction['label']})['_id']
print(outputs)
self.write(json.dumps(prediction))
else:
self.write({'statusCode': 400, 'body':'An error occurred.'})
else:
self.write({'statusCode': 400, 'body':'There were no results returned. Please ensure the id parameter has a valid merchant id and the category id has a valid product type'})
else:
self.write({'statusCode': 400, 'body':'Please pass a name on the query string or in the request body'})
def make_app():
return tornado.web.Application([
(r'/categorize',Predictionator),
])
def start_nado():
print('starting nado')
app = make_app()
server = app.listen(8888)
return server
def restart():
python = sys.executable
os.execl(python, python, * sys.argv)
def stop_nado():
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
ioloop.add_callback(ioloop.close)
print('stopping nado')
def main():
while True:
try:
try:
server = start_nado()
tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
tornado.ioloop.IOLoop.current().start()
except OSError:
print('restarting')
restart()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.instance().stop()
break
if __name__ == "__main__":
try:
main()
except OSError:
tornado.ioloop.IOLoop.instance.stop()
main()
主要问题来自预测器 class。这个想法是它从数据库中取出 5 个产品并对每个产品进行预测,return 其中 class 的预测最多。这工作正常,但需要一段时间,所以我们想将它与流程并行化。
第一个问题是挂断,它会对两个进行预测,然后变得完全没有反应。这是 tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
成为解决方案的时候,基本上每 10 分钟重新启动一次龙卷风服务器。
在此之后,出现 OSError: [Errno 24] Too many open files
错误。这是 restart
函数成为 [hacky] 解决方案的时候,它几乎只是重新启动程序。
整个过程在大约 2 天的时间里运行良好,此后 运行 所在的服务器变得完全没有响应。在这一点上,我只是在寻找正确方向的一点,我怀疑龙卷风是问题所在,但我应该完全使用不同的框架吗?我对 python 的龙卷风和并行进程还很陌生。
谢谢
因为每次 if pred_list
条件满足时您都会创建 4 个新进程。
通常,在 Tornado 程序中,您会创建一个全局 executor
对象并重复使用它。
# create a global object
executor = concurrent.futures.ProcessPoolExecutor(4)
class Predictionator(...):
...
def get():
...
# use the global `executor` object instead of creating a new one
for pred_out in executor.map(...)
另一种方法是在 with...as
语句中创建 executor
,这些进程将在完成任务后自动关闭并清理(请参阅之前答案末尾的注释)使用此方法)。
def get():
...
with concurrent.futures.ProcessPoolExecutor(4) as executor:
for pred_out in executor.map(...)
注意:第一种方法会给你更好的性能。在第二种方法中,涉及创建和关闭进程的开销。
我觉得在这一点上我离良好的实践还很远。我在使用龙卷风框架的同时还利用了 ProcessPoolExecutor
import tornado.ioloop
import datetime
import tornado.web
import azure.functions as func
import json
import logging
import requests
import sys
import os
import pymongo
import mongo_config
import re
import concurrent.futures
from azure_model import Pytorch_Azure
MONGO_URL = mongo_config.uri()
mongo_client = pymongo.MongoClient(MONGO_URL)
db = mongo_client['db']
prods = mongo_client['db']['products']
pta = Pytorch_Azure()
def parallel_pred(img):
r = requests.get(img, timeout = 10)
img_id = img.split('/')[-1].split('.')[0]
img_name = 'tmp{}.png'.format(img_id)
with open(img_name, 'wb') as f:
f.write(r.content)
prediction = pta.predict(img_name)
os.remove(img_name)
return prediction
class Predictionator(tornado.web.RequestHandler):
def data_received(self, chunk):
pass
def get(self):
merchant_id = self.get_argument('id', None, True)
prod_type = self.request.uri.split('category=')[1].split('&id=')[0].replace('%20', ' ').replace('%26', '&').replace('%27', '\'')
pred_list = []
outputs = {}
print(type(prod_type))
if merchant_id and prod_type:
counter = 0
try:
print(prod_type)
for i in prods.find({'merchant': int(merchant_id), 'details.product_type':re.compile('^' + prod_type + '$', re.IGNORECASE)}):
prod_img = i['merchantImages'][0]
if prod_img not in pred_list:
pred_list.append(prod_img)
counter += 1
if counter == 5:
break
except:
self.write({'body': 'There was an error with the query. Please ensure you are using a correct merchant id and product type'})
print(pred_list)
if pred_list:
try:
executor = concurrent.futures.ProcessPoolExecutor(4)
for pred_out in executor.map(parallel_pred, pred_list, timeout = 15):
if pred_out['label'] not in outputs.keys():
outputs[pred_out['label']] = 1
else:
outputs[pred_out['label']] += 1
except:
self.write({'body': 'There was an issue making the predictions.'})
if outputs:
prediction = {}
prediction['label'] = max(outputs, key = outputs.get)
prediction['object_id'] = db.categories.find_one({'name':prediction['label']})['_id']
print(outputs)
self.write(json.dumps(prediction))
else:
self.write({'statusCode': 400, 'body':'An error occurred.'})
else:
self.write({'statusCode': 400, 'body':'There were no results returned. Please ensure the id parameter has a valid merchant id and the category id has a valid product type'})
else:
self.write({'statusCode': 400, 'body':'Please pass a name on the query string or in the request body'})
def make_app():
return tornado.web.Application([
(r'/categorize',Predictionator),
])
def start_nado():
print('starting nado')
app = make_app()
server = app.listen(8888)
return server
def restart():
python = sys.executable
os.execl(python, python, * sys.argv)
def stop_nado():
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
ioloop.add_callback(ioloop.close)
print('stopping nado')
def main():
while True:
try:
try:
server = start_nado()
tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
tornado.ioloop.IOLoop.current().start()
except OSError:
print('restarting')
restart()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.instance().stop()
break
if __name__ == "__main__":
try:
main()
except OSError:
tornado.ioloop.IOLoop.instance.stop()
main()
主要问题来自预测器 class。这个想法是它从数据库中取出 5 个产品并对每个产品进行预测,return 其中 class 的预测最多。这工作正常,但需要一段时间,所以我们想将它与流程并行化。
第一个问题是挂断,它会对两个进行预测,然后变得完全没有反应。这是 tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
成为解决方案的时候,基本上每 10 分钟重新启动一次龙卷风服务器。
在此之后,出现 OSError: [Errno 24] Too many open files
错误。这是 restart
函数成为 [hacky] 解决方案的时候,它几乎只是重新启动程序。
整个过程在大约 2 天的时间里运行良好,此后 运行 所在的服务器变得完全没有响应。在这一点上,我只是在寻找正确方向的一点,我怀疑龙卷风是问题所在,但我应该完全使用不同的框架吗?我对 python 的龙卷风和并行进程还很陌生。
谢谢
因为每次 if pred_list
条件满足时您都会创建 4 个新进程。
通常,在 Tornado 程序中,您会创建一个全局 executor
对象并重复使用它。
# create a global object
executor = concurrent.futures.ProcessPoolExecutor(4)
class Predictionator(...):
...
def get():
...
# use the global `executor` object instead of creating a new one
for pred_out in executor.map(...)
另一种方法是在 with...as
语句中创建 executor
,这些进程将在完成任务后自动关闭并清理(请参阅之前答案末尾的注释)使用此方法)。
def get():
...
with concurrent.futures.ProcessPoolExecutor(4) as executor:
for pred_out in executor.map(...)
注意:第一种方法会给你更好的性能。在第二种方法中,涉及创建和关闭进程的开销。