app/tasks.py 导入模型时,Celery beat 周期性任务失败
Celery beat periodic tasks fail when the app/tasks.py imports a Model
我有一个与 celery django demo 相同的程序结构,运行良好。问题是在我的 demoapp/tasks.py 模拟中我导入了一个触发
的模型 (from .models import Service
)
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
至少可以说很奇怪。计划的全部要点是通过向 api 发送请求来更新数据库。取出导入语句并注释掉适当的代码后,任务将按预期运行。这是模型的代码,但是无论模型有多少字段都会发生错误,这让我相信错误不是由于模型的编程方式造成的。我在跟踪火车。
from django.db import models
# Create your models here.
class Service(models.Model):
id = models.TextField(primary_key=True)
scheduled = models.DateTimeField()
expected = models.DateTimeField(null=True, blank=True) # null means cancelled
service = models.TextField()
platform = models.TextField()
origin = models.TextField()
dest = models.TextField()
编辑:这是我的 demoapp/tasks.py 文件:
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import requests
from .models import Service
from django.db import transaction
import datetime
import pytz
def clean(entry):
def remove_location_lists(i):
"""takes care of services with multiple origins or destinations, taking only the first of each."""
if type(i["origin"]["location"]) is list:
i["origin"] = i["origin"]["location"][0]["locationName"]
else:
i["origin"] = i["origin"]["location"]["locationName"]
if type(i["dest"]["location"]) is list:
i["dest"] = i["dest"]["location"][0]["locationName"]
else:
i["dest"] = i["dest"]["location"]["locationName"]
return i
def parse_time(i):
"""converts the raw time string to a datetime object, and includes some logic to manage crossing midnight."""
if i["expected"] == "Cancelled":
i["service"] = "cancelled"
i["expected"] = None
elif i["expected"] == "Delayed": # blank
i["service"] = "delayed"
i["expected"] = None
else:
i["service"] = "ontime"
i["expected"] = pytz.utc.localize(datetime.datetime.strptime(i["scheduled"], '%H:%M'))
i["scheduled"] = pytz.utc.localize(datetime.datetime.strptime(i["scheduled"], '%H:%M'))
i["scheduled"] = i["scheduled"].replace(day=datetime.date.today().day, month=datetime.date.today().month,
year=datetime.date.today().year)
try:
i["expected"] = i["expected"].replace(day=datetime.date.today().day, month=datetime.date.today().month,
year=datetime.date.today().year)
# include day detection logic!
except Exception as inst:
pass
return i
entry = remove_location_lists(entry)
entry = parse_time(entry)
return entry
@shared_task
def get_trips():
print("run task")
url = "http://www.southernrailway.com/ajax/departures/json/"
get = {'from': 'GTW', 'to': '', 'id': ''}
r = requests.post(url, get)
json = r.json()
unsaved = []
for i in json["arrayServices"]:
entry = {'id': i["serviceID"],
'scheduled': i["std"],
'expected': i["etd"],
'service': i["service"],
'platform': i["platform"],
'origin': i["origin"],
'dest': i["destination"]}
entry = clean(entry)
entry = Service(id=entry['id'], scheduled=entry['scheduled'], expected=entry["expected"],
service=entry['service'], platform=entry['platform'], origin=entry['origin'],
dest=entry['dest'])
unsaved.append(entry)
with transaction.atomic():
for i in unsaved:
i.save()
而且,为了更好的衡量,这里是我的 celery.py:
的内容
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from shitternrailways.tasks import get_trips
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'arlyon.settings')
app = Celery('arlyon')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(1, get_trips.s(), name="get trips")
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
最后是完整的回溯:
/Users/v/Documents/programming/venvs/arlyon/bin/python /Users/v/Documents/programming/arlyon/manage.py runserver 8000
Traceback (most recent call last):
File "/Users/v/Documents/programming/arlyon/manage.py", line 22, in <module>
execute_from_command_line(sys.argv)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/core/management/__init__.py", line 353, in execute_from_command_line
utility.execute()
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/core/management/__init__.py", line 302, in execute
settings.INSTALLED_APPS
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 55, in __getattr__
self._setup(name)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 43, in _setup
self._wrapped = Settings(settings_module)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 99, in __init__
mod = importlib.import_module(self.SETTINGS_MODULE)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 986, in _gcd_import
File "<frozen importlib._bootstrap>", line 969, in _find_and_load
File "<frozen importlib._bootstrap>", line 944, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 986, in _gcd_import
File "<frozen importlib._bootstrap>", line 969, in _find_and_load
File "<frozen importlib._bootstrap>", line 958, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 673, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 665, in exec_module
File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
File "/Users/v/Documents/programming/arlyon/arlyon/__init__.py", line 5, in <module>
from .celery import app as celery_app
File "/Users/v/Documents/programming/arlyon/arlyon/celery.py", line 4, in <module>
from shitternrailways.tasks import get_trips
File "/Users/v/Documents/programming/arlyon/shitternrailways/tasks.py", line 5, in <module>
from .models import Service
File "/Users/v/Documents/programming/arlyon/shitternrailways/models.py", line 5, in <module>
class Service(models.Model):
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/db/models/base.py", line 94, in __new__
app_config = apps.get_containing_app_config(module)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/apps/registry.py", line 239, in get_containing_app_config
self.check_apps_ready()
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/apps/registry.py", line 124, in check_apps_ready
raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
Process finished with exit code 1
任何搜索都没有提供多少帮助,这可能是因为 celery 4 还很年轻。所以我转向你们。感谢您提供的任何提示。
您正在为任务使用@shared_task装饰器。这意味着该任务未附加到任何应用程序。但是,在 celery.py 中,您正尝试将此任务用于节拍调度程序,它会抛出错误。
我看到了两种可能的解决方法。首先,您可以将此任务从 demoapp 移至 celery.py 并将其与 @app.task 装饰器一起使用。
其次,您可以尝试从应用程序访问任务。为此,您调用了 app.autodiscover_tasks()。这意味着任务已加载到应用程序中。所以你可以尝试:
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(1, app.tasks["demoapp.get_trips"], name="get trips")
不过还需要测试一下。我没有在我的机器上试过。祝你好运!
我有一个与 celery django demo 相同的程序结构,运行良好。问题是在我的 demoapp/tasks.py 模拟中我导入了一个触发
的模型 (from .models import Service
)
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
至少可以说很奇怪。计划的全部要点是通过向 api 发送请求来更新数据库。取出导入语句并注释掉适当的代码后,任务将按预期运行。这是模型的代码,但是无论模型有多少字段都会发生错误,这让我相信错误不是由于模型的编程方式造成的。我在跟踪火车。
from django.db import models
# Create your models here.
class Service(models.Model):
id = models.TextField(primary_key=True)
scheduled = models.DateTimeField()
expected = models.DateTimeField(null=True, blank=True) # null means cancelled
service = models.TextField()
platform = models.TextField()
origin = models.TextField()
dest = models.TextField()
编辑:这是我的 demoapp/tasks.py 文件:
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import requests
from .models import Service
from django.db import transaction
import datetime
import pytz
def clean(entry):
def remove_location_lists(i):
"""takes care of services with multiple origins or destinations, taking only the first of each."""
if type(i["origin"]["location"]) is list:
i["origin"] = i["origin"]["location"][0]["locationName"]
else:
i["origin"] = i["origin"]["location"]["locationName"]
if type(i["dest"]["location"]) is list:
i["dest"] = i["dest"]["location"][0]["locationName"]
else:
i["dest"] = i["dest"]["location"]["locationName"]
return i
def parse_time(i):
"""converts the raw time string to a datetime object, and includes some logic to manage crossing midnight."""
if i["expected"] == "Cancelled":
i["service"] = "cancelled"
i["expected"] = None
elif i["expected"] == "Delayed": # blank
i["service"] = "delayed"
i["expected"] = None
else:
i["service"] = "ontime"
i["expected"] = pytz.utc.localize(datetime.datetime.strptime(i["scheduled"], '%H:%M'))
i["scheduled"] = pytz.utc.localize(datetime.datetime.strptime(i["scheduled"], '%H:%M'))
i["scheduled"] = i["scheduled"].replace(day=datetime.date.today().day, month=datetime.date.today().month,
year=datetime.date.today().year)
try:
i["expected"] = i["expected"].replace(day=datetime.date.today().day, month=datetime.date.today().month,
year=datetime.date.today().year)
# include day detection logic!
except Exception as inst:
pass
return i
entry = remove_location_lists(entry)
entry = parse_time(entry)
return entry
@shared_task
def get_trips():
print("run task")
url = "http://www.southernrailway.com/ajax/departures/json/"
get = {'from': 'GTW', 'to': '', 'id': ''}
r = requests.post(url, get)
json = r.json()
unsaved = []
for i in json["arrayServices"]:
entry = {'id': i["serviceID"],
'scheduled': i["std"],
'expected': i["etd"],
'service': i["service"],
'platform': i["platform"],
'origin': i["origin"],
'dest': i["destination"]}
entry = clean(entry)
entry = Service(id=entry['id'], scheduled=entry['scheduled'], expected=entry["expected"],
service=entry['service'], platform=entry['platform'], origin=entry['origin'],
dest=entry['dest'])
unsaved.append(entry)
with transaction.atomic():
for i in unsaved:
i.save()
而且,为了更好的衡量,这里是我的 celery.py:
的内容from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from shitternrailways.tasks import get_trips
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'arlyon.settings')
app = Celery('arlyon')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(1, get_trips.s(), name="get trips")
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
最后是完整的回溯:
/Users/v/Documents/programming/venvs/arlyon/bin/python /Users/v/Documents/programming/arlyon/manage.py runserver 8000
Traceback (most recent call last):
File "/Users/v/Documents/programming/arlyon/manage.py", line 22, in <module>
execute_from_command_line(sys.argv)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/core/management/__init__.py", line 353, in execute_from_command_line
utility.execute()
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/core/management/__init__.py", line 302, in execute
settings.INSTALLED_APPS
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 55, in __getattr__
self._setup(name)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 43, in _setup
self._wrapped = Settings(settings_module)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 99, in __init__
mod = importlib.import_module(self.SETTINGS_MODULE)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 986, in _gcd_import
File "<frozen importlib._bootstrap>", line 969, in _find_and_load
File "<frozen importlib._bootstrap>", line 944, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 986, in _gcd_import
File "<frozen importlib._bootstrap>", line 969, in _find_and_load
File "<frozen importlib._bootstrap>", line 958, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 673, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 665, in exec_module
File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
File "/Users/v/Documents/programming/arlyon/arlyon/__init__.py", line 5, in <module>
from .celery import app as celery_app
File "/Users/v/Documents/programming/arlyon/arlyon/celery.py", line 4, in <module>
from shitternrailways.tasks import get_trips
File "/Users/v/Documents/programming/arlyon/shitternrailways/tasks.py", line 5, in <module>
from .models import Service
File "/Users/v/Documents/programming/arlyon/shitternrailways/models.py", line 5, in <module>
class Service(models.Model):
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/db/models/base.py", line 94, in __new__
app_config = apps.get_containing_app_config(module)
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/apps/registry.py", line 239, in get_containing_app_config
self.check_apps_ready()
File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/apps/registry.py", line 124, in check_apps_ready
raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
Process finished with exit code 1
任何搜索都没有提供多少帮助,这可能是因为 celery 4 还很年轻。所以我转向你们。感谢您提供的任何提示。
您正在为任务使用@shared_task装饰器。这意味着该任务未附加到任何应用程序。但是,在 celery.py 中,您正尝试将此任务用于节拍调度程序,它会抛出错误。
我看到了两种可能的解决方法。首先,您可以将此任务从 demoapp 移至 celery.py 并将其与 @app.task 装饰器一起使用。
其次,您可以尝试从应用程序访问任务。为此,您调用了 app.autodiscover_tasks()。这意味着任务已加载到应用程序中。所以你可以尝试:
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(1, app.tasks["demoapp.get_trips"], name="get trips")
不过还需要测试一下。我没有在我的机器上试过。祝你好运!