用于 ML 预测的 Celery 任务在执行中挂起

Celery task for ML prediction hangs in execution

我正在尝试创建一个 Web 应用程序,它接收来自 POST 请求的输入并根据该输入提供一些 ML 预测。

由于预测模型很重,我不希望用户等待计算完成。相反,我将繁重的计算委托给 Celery 任务,用户稍后可以检查结果。

我正在使用带有 Celery、Redis 和 Flower 的简单 Flask 应用程序。

我的view.py:

@ns.route('predict/')
class Predict(Resource):
    ...
    def post(self):
        ...
        do_categorize(data)
        return jsonify(success=True)

我的 tasks.py 文件看起来像这样:

from ai.categorizer import Categorizer
categorizer = Categorizer(
    model_path='category_model.h5',
    tokenizer_path='tokenize.joblib',
    labels_path='labels.joblib'
)


@task()
def do_categorize(data):
    result = categorizer.predict(data)
    print(result)
    # Write result to the DB
    ...

我的predict()方法里面Categorizerclass:

def predict(self, value):
    K.set_session(self.sess)
    with self.sess.as_default():
        with self.graph.as_default():
            prediction = self.model.predict(np.asarray([value], dtype='int64'))
            return prediction

我是这样的 运行ning Celery:

celery worker -A app.celery --loglevel=DEBUG

我最近几天遇到的问题是 categorizer.predict(data) 调用在执行过程中挂起。

我尝试在 post 方法中 运行 categorizer.predict(data) 并且有效。但是,如果我将它放在 Celery 任务中,它就会停止工作。没有控制台日志,如果我尝试调试它,它只会在 .predict().

处冻结

我的问题:

感谢这个 SO question 我找到了问题的答案:

事实证明,Keras 使用 Threads 池而不是默认的 Process 更好。

我很幸运,Celery 4.4 Threads 不久前重新引入了池化。 您可以在 Celery 4.4 Changelogs:

阅读更多内容

Threaded Tasks Pool

We reintroduced a threaded task pool using concurrent.futures.ThreadPoolExecutor.

The previous threaded task pool was experimental. In addition it was based on the threadpool package which is obsolete.

You can use the new threaded task pool by setting worker_pool to ‘threads` or by passing –pool threads to the celery worker command.

现在您可以使用线程而不是进程来进行池化。

celery worker -A your_application --pool threads --loginfo=INFO

如果你不能使用最新的 Celery 版本,你可以使用 gevent 包:

pip install gevent
celery worker -A your_application --pool gevent --loginfo=INFO