为什么芹菜自动发现不是来自不同文件夹的映射任务?

why celery autodiscover isnt mapping tasks from different folders?

我有以下 celery 项目结构,它不需要使用 django / flask,但是我正在尝试根据此站点的 celery 结构布局来映射我的任务 https://docs.celeryproject.org/en/latest/_modules/celery/app/base.html#Celery.autodiscover_tasks 我自动发现可能会出现一些问题,但我仍然有从不同文件夹映射的零个任务

.
├── docker-compose.yml
├── LICENSE
├── logs
├── README.md
├── redis
│   ├── Dockerfile
│   └── redis.conf
└── thirdparty
    ├── appone
    │   ├── __init__.py
    │   └── tasks.py
    ├── apptwo
    │   ├── __init__.py
    │   └── tasks.py
    └── celery_tasks
        ├── celery.py
        ├── __init__.py
        └── settings.py

thirdparty/celery_tasks/celery.py

from celery import Celery

app = Celery("thirdparty")
app.config_from_object('celery_tasks.settings', namespace="CELERY")
app.autodiscover_tasks(['celery_tasks.appone', 'celery_tasks.apptwo'])

thirdparty/celery_tasks/settings.py

from celery.schedules import crontab
from datetime import timedelta


CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
CELERY_BEAT_SCHEDULE = {
    "sample_task": {
        "task": "appone.tasks.add",
        'schedule': timedelta(seconds=30),
    },
    "send_email_report": {
        "task": "apptwo.tasks.mult",
        "schedule": crontab(minute="*"),
    },
}

# run task every 30 minutes
CELERY_BEAT_INTERVAL = 30 * 60

thirdparty/apptwo/tasks.py

import time
from celery_tasks.celery import app

@app.task
def mult(x, y):
    print('start apptwo mult function')
    time.sleep(10)
    print('result:', x * y)
    return x * y

让我们关注这个路径树:

└── thirdparty
    ├── appone
    │   ├── __init__.py
    │   └── tasks.py
    ├── apptwo
    │   ├── __init__.py
    │   └── tasks.py
    └── celery_tasks
        ├── celery.py
        ├── __init__.py
        └── settings.py

相对于外部项目(作为源根),模块列表是:

thirdparty.appone.tasks (contains add)
thirdparty.apptwo.tasks (contains mult)
thirdparty.celery_tasks.celery (contains app)
thirdparty.celery_tasks.settings (contains CELERY_BROKER_URL, CELERY_RESULT_BACKEND, etc.)

相对于第三方(作为源根),模块列表是:

appone.tasks (contains add)
apptwo.tasks (contains mult)
celery_tasks.celery (contains app)
celery_tasks.settings (contains CELERY_BROKER_URL, CELERY_RESULT_BACKEND, etc.)

根据您从 thirdparty/apptwo/tasks.py 导入 celery 应用程序的方式,我们可以说您已将第三方设计为源根目录:

from celery_tasks.celery import app  # If the sources root was the outer project, then this would be -> from thirdparty.celery_tasks.celery import app

您访问 ./thirdparty/appone/ 和 ./thirdparty/apptwo/ 的方式错误:

app.autodiscover_tasks(['celery_tasks.appone', 'celery_tasks.apptwo'])

为什么?模块 celery_tasks 没有 .appone 或 .apptwo。它只有 .celery 和 .settings。如果您查看我上面描述的模块列表,这是正确的路径:

app.autodiscover_tasks(['appone', 'apptwo'])  # This assumes that thirdparty is your sources root, thus you wouldn't write it as thirdparty.appone nor thirdparty.apptwo

进一步阅读

鉴于 thirdparty 是源根目录,您必须将导入格式设置为始终相对于它,例如from celery_tasks.celery import appnot 相对于外部项目,例如from thirdparty.celery_tasks.celery import app 例如from myproj.thirdparty.celery_tasks.celery import app.

然后,您必须将其告知 PYTHONPATH,以便您可以执行相对于第三方目录的导入(例如您如何使用 from celery_tasks.celery import app)。

export PYTHONPATH=${PYTHONPATH}:/path/to/myproj/thirdparty

为什么?假设您刚刚为外部项目定义了 PYTHONPATH:

export PYTHONPATH=${PYTHONPATH}:/path/to/myproj

然后,您必须相对于 myproj 而不是相对于 thirdparty 导入模块。所以你在 thirdparty/apptwo/tasks.py 中导入的方式应该是:

from thirdparty.celery_tasks.celery import app

虽然您的自动发现是:

app.autodiscover_tasks(['thirdparty.appone', 'thirdparty.apptwo'])