不能 运行 使用 Celery、Flask 和 RabbitMQ 的 Celery 工作人员:无法加载 Celery 应用程序

Can't run Celery worker using Celery, Flask, and RabbitMQ: unable to load celery application

当我 运行 celery -A app.celery worker --loglevel=INFO --pidfile='' 我得到以下信息:

Usage: celery [OPTIONS] COMMAND [ARGS]...

Error: Invalid value for '-A' / '--app':
Unable to load celery application.
'nonetype' object has no attribute '_instantiate_plugins'

据我所知,celery -A [name].celery... [name] 中应该是创建和保存 Celery 实例的文件,在我的例子中是 app.py。 这是我第一次使用 Celery,希望能在这里得到帮助!

我的文件结构如下:

--app
   -- app.py
   -- celery_config
      -- __init__.py
      -- flask_celery.py
app.py
from flask import Flask
from celery_config.flask_celery import make_celery

# Create app
app = Flask(__name__)
app.config.from_envvar("APP_SETTINGS")

...

# Setup Celery
celery = make_celery(app)
flask_celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

def make_celery(app):
    celery = Celery(app.import_name)
    celery.conf.update(
        result_backend=app.config["CELERY_RESULT_BACKEND"],
        broker_url=app.config["CELERY_BROKER_URL"],
        timezone="UTC",
        task_serializer="json",
        accept_content=["json"],
        result_serializer="json"
    )
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

我想通了。

事实证明,因为我是 运行 从 app 目录中执行命令,所以我需要 运行 celery -A celery worker --loglevel=INFO --pidfile='' 而不是 celery -A app.celery worker --loglevel=INFO --pidfile='' -- -A app 在目录 app 中搜索 celery,但我已经在该目录中了。我是在找到 this GitHub 评论后才意识到这一点的。

更新:这也不是答案,问题是我期待 运行 让 Celery 工作人员获取我的 .env 变量,但它没有这样做,因为它不是特定于 Flask 的包。我必须将我的 .env 变量导出到我的本地环境中,因为它试图在没有 DATABASE_URL 变量的情况下实例化应用程序的数据库。 celery -A app.celery worker --loglevel=INFO --pidfile='' 是正确的命令。