app/tasks.py 导入模型时,Celery beat 周期性任务失败

Celery beat periodic tasks fail when the app/tasks.py imports a Model

我有一个与 celery django demo 相同的程序结构,运行良好。问题是在我的 demoapp/tasks.py 模拟中我导入了一个触发

的模型 (from .models import Service)
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

至少可以说很奇怪。计划的全部要点是通过向 api 发送请求来更新数据库。取出导入语句并注释掉适当的代码后,任务将按预期运行。这是模型的代码,但是无论模型有多少字段都会发生错误,这让我相信错误不是由于模型的编程方式造成的。我在跟踪火车。

from django.db import models

# Create your models here.

class Service(models.Model):
    id = models.TextField(primary_key=True)
    scheduled = models.DateTimeField()
    expected = models.DateTimeField(null=True, blank=True) # null means cancelled
    service = models.TextField()
    platform = models.TextField()
    origin = models.TextField()
    dest = models.TextField()

编辑:这是我的 demoapp/tasks.py 文件:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import requests
from .models import Service
from django.db import transaction
import datetime
import pytz

def clean(entry):
    def remove_location_lists(i):
        """takes care of services with multiple origins or destinations, taking only the first of each."""
        if type(i["origin"]["location"]) is list:
            i["origin"] = i["origin"]["location"][0]["locationName"]
        else:
            i["origin"] = i["origin"]["location"]["locationName"]

        if type(i["dest"]["location"]) is list:
            i["dest"] = i["dest"]["location"][0]["locationName"]
        else:
            i["dest"] = i["dest"]["location"]["locationName"]

        return i

    def parse_time(i):
        """converts the raw time string to a datetime object, and includes some logic to manage crossing midnight."""
        if i["expected"] == "Cancelled":
            i["service"] = "cancelled"
            i["expected"] = None
        elif i["expected"] == "Delayed":  # blank
            i["service"] = "delayed"
            i["expected"] = None
        else:
            i["service"] = "ontime"
            i["expected"] = pytz.utc.localize(datetime.datetime.strptime(i["scheduled"], '%H:%M'))

        i["scheduled"] = pytz.utc.localize(datetime.datetime.strptime(i["scheduled"], '%H:%M'))
        i["scheduled"] = i["scheduled"].replace(day=datetime.date.today().day, month=datetime.date.today().month,
                                                year=datetime.date.today().year)

        try:
            i["expected"] = i["expected"].replace(day=datetime.date.today().day, month=datetime.date.today().month,
                                                  year=datetime.date.today().year)

            # include day detection logic!

        except Exception as inst:
            pass

        return i

    entry = remove_location_lists(entry)
    entry = parse_time(entry)

    return entry

@shared_task
def get_trips():
    print("run task")
    url = "http://www.southernrailway.com/ajax/departures/json/"
    get = {'from': 'GTW', 'to': '', 'id': ''}

    r = requests.post(url, get)
    json = r.json()

    unsaved = []

    for i in json["arrayServices"]:
        entry = {'id': i["serviceID"],
                 'scheduled': i["std"],
                 'expected': i["etd"],
                 'service': i["service"],
                 'platform': i["platform"],
                 'origin': i["origin"],
                 'dest': i["destination"]}

        entry = clean(entry)

        entry = Service(id=entry['id'], scheduled=entry['scheduled'], expected=entry["expected"],
                        service=entry['service'], platform=entry['platform'], origin=entry['origin'],
                        dest=entry['dest'])

        unsaved.append(entry)

    with transaction.atomic():
        for i in unsaved:
            i.save()

而且,为了更好的衡量,这里是我的 celery.py:

的内容
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from shitternrailways.tasks import get_trips

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'arlyon.settings')

app = Celery('arlyon')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(1, get_trips.s(), name="get trips")

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

最后是完整的回溯:

/Users/v/Documents/programming/venvs/arlyon/bin/python /Users/v/Documents/programming/arlyon/manage.py runserver 8000
Traceback (most recent call last):
  File "/Users/v/Documents/programming/arlyon/manage.py", line 22, in <module>
    execute_from_command_line(sys.argv)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/core/management/__init__.py", line 353, in execute_from_command_line
    utility.execute()
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/core/management/__init__.py", line 302, in execute
    settings.INSTALLED_APPS
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 55, in __getattr__
    self._setup(name)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 43, in _setup
    self._wrapped = Settings(settings_module)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 99, in __init__
    mod = importlib.import_module(self.SETTINGS_MODULE)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 986, in _gcd_import
  File "<frozen importlib._bootstrap>", line 969, in _find_and_load
  File "<frozen importlib._bootstrap>", line 944, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 986, in _gcd_import
  File "<frozen importlib._bootstrap>", line 969, in _find_and_load
  File "<frozen importlib._bootstrap>", line 958, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 673, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 665, in exec_module
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "/Users/v/Documents/programming/arlyon/arlyon/__init__.py", line 5, in <module>
    from .celery import app as celery_app
  File "/Users/v/Documents/programming/arlyon/arlyon/celery.py", line 4, in <module>
    from shitternrailways.tasks import get_trips
  File "/Users/v/Documents/programming/arlyon/shitternrailways/tasks.py", line 5, in <module>
    from .models import Service
  File "/Users/v/Documents/programming/arlyon/shitternrailways/models.py", line 5, in <module>
    class Service(models.Model):
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/db/models/base.py", line 94, in __new__
    app_config = apps.get_containing_app_config(module)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/apps/registry.py", line 239, in get_containing_app_config
    self.check_apps_ready()
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/apps/registry.py", line 124, in check_apps_ready
    raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

Process finished with exit code 1

任何搜索都没有提供多少帮助,这可能是因为 celery 4 还很年轻。所以我转向你们。感谢您提供的任何提示。

您正在为任务使用@shared_task装饰器。这意味着该任务未附加到任何应用程序。但是,在 celery.py 中,您正尝试将此任务用于节拍调度程序,它会抛出错误。

我看到了两种可能的解决方法。首先,您可以将此任务从 demoapp 移至 celery.py 并将其与 @app.task 装饰器一起使用。

其次,您可以尝试从应用程序访问任务。为此,您调用了 app.autodiscover_tasks()。这意味着任务已加载到应用程序中。所以你可以尝试:

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(1, app.tasks["demoapp.get_trips"], name="get trips")

不过还需要测试一下。我没有在我的机器上试过。祝你好运!