在烧瓶框架中将应用程序上下文传递给芹菜

Pass application context to celery in flask framework

我尝试将 celery 添加到我现有的 flask 项目中。添加后,在 运行 时出现 "working outside of application context" 错误。芹菜工人似乎缺少我的应用程序上下文。但是我不确定在这种情况下将应用程序上下文传递给 celery worker 的位置。

这是我目前的结构(我试图遵循带有蓝图和 api 文档的工厂模式):

-run.py
-app
    -module1
      -controller.py
      -model.py
      -service.py
    -__init__.py
    -config.py

对于init.py

# __init__.py
import os
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from app.config import Config
from flask_restplus import Api
from celery import Celery

cors = CORS()
db = SQLAlchemy()
api = Api()
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, include=["app.module1.service"])

def create_app(config_class = Config):
    app = Flask(__name__, static_url_path='')
    app.config.from_object(Config)

    cors.init_app(app)
    db.init_app(app)
    api.init_app(app=app)
    celery.conf.update(app.config)

    from app.module1.controller import blueprint
    from app.module1.controller import ns

    app.register_blueprint(blueprint)
    api.add_namespace(ns)

    return app

对于run.py

from app import create_app
app = create_app()
if __name__ == '__main__':
    app.run(threaded=True, debug=True)

对于service.py

from app import db, celery
@celery.task(bind=True)
def service1(self):
    # do somethigng & return

对于controller.py

from flask import Blueprint
from flask_restplus import Api, Resouce
blueprint = Blueprint('service', __name__)
apis = Api(app = blueprint)
ns = apis.namespace('service', 'service description')
@ns.route("/")
class SomeList(Resource):
    def get(self):
        service1.apply_async()
        # return

我认为混淆是基于这样一个事实,即您正试图 "pass" Celery worker 的应用程序上下文。实际上,Flask 进程无法将上下文传递给 worker,因为它们是不同的进程。 Celery 工作进程需要通过调用 create_app() 创建自己的 Flask 应用程序实例,以便它可以在需要时推送自己的应用程序上下文。

例如,在您的 service1 任务中:

from app import db, celery, create_app

@celery.task(bind=True)
def service1(self):
    app = create_app()
    with app.app_context():
        # do somethigng & return

为了提高效率,您可以创建一个全局 app 供所有任务共享:

from app import db, celery, create_app
app = create_app()

@celery.task(bind=True)
def service1(self):
    with app.app_context():
        # do somethigng & return