无法使用 Celery + Django 在 tasks.py 中导入模型
Can't import models in tasks.py with Celery + Django
我想创建一个后台任务来更新特定日期的记录。我在 RabbitMQ 中使用 Django 和 Celery。
我已经设法在使用此虚拟任务函数保存模型时调用任务:
tasks.py
from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', broker='amqp://localhost//')
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
# (I pass the news id and return it, nothing complicated about it)
return news_id
此任务是从我的 models.py
中的 save() 方法调用的
from django.db import models
from celery import current_app
class News(models.model):
(...)
def save(self, *args, **kwargs):
current_app.send_task('news.tasks.update_news_status', args=(self.id,))
super(News, self).save(*args, **kwargs)
事情是 我想在 tasks.py 中导入我的新闻模型,但如果我想这样:
from .models import News
我收到这个错误:
django.core.exceptions.ImproperlyConfigured: Requested setting
DEFAULT_INDEX_TABLESPACE, but settings are not configured. You must
either define the environment variable DJANGO_SETTINGS_MODULE or call
settings.configure() before accessing settings.
这就是 mi celery.py 的样子
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
我已经试过了:
- can't import django model into celery task
- 我试过在任务方法中进行导入
- 这个我也试过了
- 我也试过创建 utils.py 并导入它,但没成功。
和运行进入不同的错误,但最后我无法在tasks.py
中导入任何模块
我的配置可能有问题,但我看不到错误,我按照 The Celery Docs: First steps with Django
中的步骤操作
此外,我的项目结构如下所示:
├── myapp
│ ├── __init__.py
├── ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── news
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── tasks.py
│ ├── urls.py
│ ├── models.py
│ ├── views.py
├── manage.py
我正在从 myapp
目录执行 worker,如下所示:
celery -A news.tasks worker --loglevel=info
我在这里错过了什么?预先感谢您的帮助!
拉姆达:settings.INSTALLED_APPS
编辑
进行评论中建议的更改后:
将此添加到 celery.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
并导入内部方法:tasks.py
from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', broker='amqp://localhost//')
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
from .models import News
return news_id
我收到以下错误:
[2018-07-20 12:24:29,337: ERROR/ForkPoolWorker-1] Task news.tasks.update_news_status[87f9ec92-c260-4ee9-a3bc-5f684c819f79] raised unexpected: ValueError('Attempted relative import in non-package',)
Traceback (most recent call last):
File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 382, in trace_task
R = retval = fun(*args, **kwargs)
File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 641, in __protected_call__
return self.run(*args, **kwargs)
File "/Users/carla/Develop/App/backend/news/tasks.py", line 12, in update_news_status
from .models import News
ValueError: Attempted relative import in non-package
尝试这样的事情。它在 3.1 celery 中工作,导入应该发生在保存方法内部和 super()
之后
from django.db import models
class News(models.model):
(...)
def save(self, *args, **kwargs):
(...)
super(News, self).save(*args, **kwargs)
from task import update_news_status
update_news_status.apply_async((self.id,)) #apply_async or delay
好吧,对于任何为此苦苦挣扎的人...原来我的 celery.py 没有从设置中读取环境变量。
经过一周的大量研究,我意识到 Celery 不是 Django 的一个进程,而是一个 运行 在它之外的进程 (duh),所以当我尝试加载它们已加载的设置,但后来我无法访问我在 .env 中定义的环境变量(我使用 dotenv 库)。 Celery 试图在我的 .bash_profile 中查找环境变量(当然)
所以最后我的解决方案是在定义我的 celery.py
的同一目录中创建一个辅助模块,称为 load_env.py
和以下
from os.path import dirname, join
import dotenv
def load_env():
"Get the path to the .env file and load it."
project_dir = dirname(dirname(__file__))
dotenv.read_dotenv(join(project_dir, '.env'))
然后在我的 celery.py 上(注意最后导入和第一条指令)
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
from .load_env import load_env
load_env()
# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('myapp.settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在调用 load_env()
env 变量后加载并且 celery worker 可以访问它们。通过这样做 我现在可以从我的 tasks.py 访问其他模块,这是我的主要问题。
感谢 this guys (Caktus Consulting Group) and their django-project-template,因为如果不是他们,我将找不到答案。谢谢。
这是我会做的(Django 1.11 和 celery 4.2),你的 celery 配置有问题,你尝试重新声明 Celery 实例:
tasks.py
from myapp.celery import app # would contain what you need :)
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
# (I pass the news id and return it, nothing complicated about it)
return news_id
celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery('myapp', backend='rpc://', broker=BROKER_URL) # your config here
app.config_from_object('django.myapp:settings', namespace='CELERY') # change here
app.autodiscover_tasks()
models.py
from django.db import models
class News(models.model):
(...)
def save(self, *args, **kwargs):
super(News, self).save(*args, **kwargs)
from news.tasks import update_news_status
update_news_status.delay(self.id) # change here
并使用 celery -A myapp worker --loglevel=info
启动它,因为您的应用程序是在 myapp.celery 中定义的,所以 -A 参数需要是声明 conf 的应用程序
我想创建一个后台任务来更新特定日期的记录。我在 RabbitMQ 中使用 Django 和 Celery。
我已经设法在使用此虚拟任务函数保存模型时调用任务:
tasks.py
from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', broker='amqp://localhost//')
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
# (I pass the news id and return it, nothing complicated about it)
return news_id
此任务是从我的 models.py
中的 save() 方法调用的from django.db import models
from celery import current_app
class News(models.model):
(...)
def save(self, *args, **kwargs):
current_app.send_task('news.tasks.update_news_status', args=(self.id,))
super(News, self).save(*args, **kwargs)
事情是 我想在 tasks.py 中导入我的新闻模型,但如果我想这样:
from .models import News
我收到这个错误:
django.core.exceptions.ImproperlyConfigured: Requested setting DEFAULT_INDEX_TABLESPACE, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings.
这就是 mi celery.py 的样子
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
我已经试过了:
- can't import django model into celery task
- 我试过在任务方法中进行导入
- 这个我也试过了
- 我也试过创建 utils.py 并导入它,但没成功。
和运行进入不同的错误,但最后我无法在tasks.py
中导入任何模块我的配置可能有问题,但我看不到错误,我按照 The Celery Docs: First steps with Django
中的步骤操作此外,我的项目结构如下所示:
├── myapp
│ ├── __init__.py
├── ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── news
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── tasks.py
│ ├── urls.py
│ ├── models.py
│ ├── views.py
├── manage.py
我正在从 myapp
目录执行 worker,如下所示:
celery -A news.tasks worker --loglevel=info
我在这里错过了什么?预先感谢您的帮助!
拉姆达:settings.INSTALLED_APPS
编辑
进行评论中建议的更改后:
将此添加到 celery.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
并导入内部方法:tasks.py
from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', broker='amqp://localhost//')
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
from .models import News
return news_id
我收到以下错误:
[2018-07-20 12:24:29,337: ERROR/ForkPoolWorker-1] Task news.tasks.update_news_status[87f9ec92-c260-4ee9-a3bc-5f684c819f79] raised unexpected: ValueError('Attempted relative import in non-package',)
Traceback (most recent call last):
File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 382, in trace_task
R = retval = fun(*args, **kwargs)
File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 641, in __protected_call__
return self.run(*args, **kwargs)
File "/Users/carla/Develop/App/backend/news/tasks.py", line 12, in update_news_status
from .models import News
ValueError: Attempted relative import in non-package
尝试这样的事情。它在 3.1 celery 中工作,导入应该发生在保存方法内部和 super()
之后from django.db import models
class News(models.model):
(...)
def save(self, *args, **kwargs):
(...)
super(News, self).save(*args, **kwargs)
from task import update_news_status
update_news_status.apply_async((self.id,)) #apply_async or delay
好吧,对于任何为此苦苦挣扎的人...原来我的 celery.py 没有从设置中读取环境变量。
经过一周的大量研究,我意识到 Celery 不是 Django 的一个进程,而是一个 运行 在它之外的进程 (duh),所以当我尝试加载它们已加载的设置,但后来我无法访问我在 .env 中定义的环境变量(我使用 dotenv 库)。 Celery 试图在我的 .bash_profile 中查找环境变量(当然)
所以最后我的解决方案是在定义我的 celery.py
的同一目录中创建一个辅助模块,称为 load_env.py
和以下
from os.path import dirname, join
import dotenv
def load_env():
"Get the path to the .env file and load it."
project_dir = dirname(dirname(__file__))
dotenv.read_dotenv(join(project_dir, '.env'))
然后在我的 celery.py 上(注意最后导入和第一条指令)
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
from .load_env import load_env
load_env()
# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('myapp.settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在调用 load_env()
env 变量后加载并且 celery worker 可以访问它们。通过这样做 我现在可以从我的 tasks.py 访问其他模块,这是我的主要问题。
感谢 this guys (Caktus Consulting Group) and their django-project-template,因为如果不是他们,我将找不到答案。谢谢。
这是我会做的(Django 1.11 和 celery 4.2),你的 celery 配置有问题,你尝试重新声明 Celery 实例:
tasks.py
from myapp.celery import app # would contain what you need :)
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
# (I pass the news id and return it, nothing complicated about it)
return news_id
celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery('myapp', backend='rpc://', broker=BROKER_URL) # your config here
app.config_from_object('django.myapp:settings', namespace='CELERY') # change here
app.autodiscover_tasks()
models.py
from django.db import models
class News(models.model):
(...)
def save(self, *args, **kwargs):
super(News, self).save(*args, **kwargs)
from news.tasks import update_news_status
update_news_status.delay(self.id) # change here
并使用 celery -A myapp worker --loglevel=info
启动它,因为您的应用程序是在 myapp.celery 中定义的,所以 -A 参数需要是声明 conf 的应用程序